You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/22 11:13:50 UTC
[ignite] branch master updated: IGNITE-14565 Added additional
update counter logging for detecting of AssertionError: LWM after HWM.
Fixes #9011
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 79add25 IGNITE-14565 Added additional update counter logging for detecting of AssertionError: LWM after HWM. Fixes #9011
79add25 is described below
commit 79add251b333ea56a59bd49c688151a36ac3e0e3
Author: Makedonskaya <m....@gmail.com>
AuthorDate: Thu Apr 22 14:13:07 2021 +0300
IGNITE-14565 Added additional update counter logging for detecting of AssertionError: LWM after HWM. Fixes #9011
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../cache/IgniteCacheOffheapManagerImpl.java | 2 +-
.../cache/PartitionUpdateCounterErrorWrapper.java | 182 +++++++++++++++++++++
.../cache/PartitionUpdateCounterTrackingImpl.java | 31 +++-
...IgnitePdsSpuriousRebalancingOnNodeJoinTest.java | 10 +-
.../transactions/PartitionUpdateCounterTest.java | 9 +-
...ounterStateOnePrimaryTwoBackupsFailAllTest.java | 4 +-
6 files changed, 228 insertions(+), 10 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 71aca1d..82f9a00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1536,7 +1536,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
new PartitionUpdateCounterTrackingImpl(grp);
pCntr = grp.shared().logger(PartitionUpdateCounterDebugWrapper.class).isDebugEnabled() ?
- new PartitionUpdateCounterDebugWrapper(partId, delegate) : delegate;
+ new PartitionUpdateCounterDebugWrapper(partId, delegate) : new PartitionUpdateCounterErrorWrapper(partId, delegate);
updateValSizeThreshold = grp.shared().database().pageSize() / 2;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
new file mode 100644
index 0000000..b1cf59f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Iterator;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Update counter wrapper for error logging.
+ */
+public class PartitionUpdateCounterErrorWrapper implements PartitionUpdateCounter {
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final int partId;
+
+ /** */
+ private final CacheGroupContext grp;
+
+ /** */
+ private final PartitionUpdateCounter delegate;
+
+ /**
+ * @param partId Part id.
+ * @param delegate Delegate.
+ */
+ public PartitionUpdateCounterErrorWrapper(int partId, PartitionUpdateCounter delegate) {
+ this.partId = partId;
+ this.grp = delegate.context();
+ this.log = grp.shared().logger(getClass());
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long reserve(long delta) {
+ try {
+ return delegate.reserve(delta);
+ } catch (AssertionError e) {
+ SB sb = new SB();
+
+ sb.a("Failed to increment HWM ")
+ .a("[op=reserve")
+ .a(", grpId=").a(grp.groupId())
+ .a(", grpName=").a(grp.cacheOrGroupName())
+ .a(", caches=").a(grp.caches())
+ .a(", atomicity=").a(grp.config().getAtomicityMode())
+ .a(", syncMode=").a(grp.config().getWriteSynchronizationMode())
+ .a(", mode=").a(grp.config().getCacheMode())
+ .a(", partId=").a(partId)
+ .a(", delta=").a(delta)
+ .a("]");
+
+ U.error(log, sb.toString());
+
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
+ delegate.init(initUpdCntr, cntrUpdData);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long next() {
+ return delegate.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long next(long delta) {
+ return delegate.next(delta);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void update(long val) throws IgniteCheckedException {
+ delegate.update(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean update(long start, long delta) {
+ return delegate.update(start, delta);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ delegate.reset();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateInitial(long start, long delta) {
+ delegate.updateInitial(start, delta);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList finalizeUpdateCounters() {
+ return delegate.finalizeUpdateCounters();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resetInitialCounter() {
+ delegate.resetInitialCounter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long initial() {
+ return delegate.initial();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long get() {
+ return delegate.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (o instanceof PartitionUpdateCounterErrorWrapper)
+ return delegate.equals(((PartitionUpdateCounterErrorWrapper)o).delegate);
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long reserved() {
+ return delegate.reserved();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public byte[] getBytes() {
+ return delegate.getBytes();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean sequential() {
+ return delegate.sequential();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return delegate.empty();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<long[]> iterator() {
+ return delegate.iterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheGroupContext context() {
+ return delegate.context();
+ }
+
+ /** {@inheritDoc} */
+ @Override public PartitionUpdateCounter copy() {
+ return new PartitionUpdateCounterErrorWrapper(partId, delegate.copy());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return delegate.toString();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index 0f9d72c..f4e7a06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.SB;
import org.jetbrains.annotations.Nullable;
/**
@@ -259,7 +260,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
long reserved = reserveCntr.getAndAdd(delta);
- assert reserved >= cntr : "LWM after HWM: lwm=" + cntr + ", hwm=" + reserved;
+ assert reserved >= cntr : "LWM after HWM: lwm=" + cntr + ", hwm=" + reserved + ", cntr=" + toString();
return reserved;
}
@@ -446,8 +447,32 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
/** {@inheritDoc} */
@Override public String toString() {
- return "Counter [lwm=" + get() + ", holes=" + queue +
- ", maxApplied=" + highestAppliedCounter() + ", hwm=" + reserveCntr.get() + ']';
+ String quequeStr;
+ long lwm;
+ long hwm;
+ long maxApplied;
+
+ synchronized (this) {
+ quequeStr = queue.toString();
+
+ lwm = get();
+
+ hwm = reserveCntr.get();
+
+ maxApplied = highestAppliedCounter();
+ }
+
+ return new SB()
+ .a("Counter [lwm=")
+ .a(lwm)
+ .a(", holes=")
+ .a(quequeStr)
+ .a(", maxApplied=")
+ .a(maxApplied)
+ .a(", hwm=")
+ .a(hwm)
+ .a(']')
+ .toString();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
index 564a728..960fa7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
@@ -31,6 +31,8 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterErrorWrapper;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
@@ -146,9 +148,13 @@ public class IgnitePdsSpuriousRebalancingOnNodeJoinTest extends GridCommonAbstra
assertNotNull(part);
- PartitionUpdateCounterTrackingImpl cntr0 = (PartitionUpdateCounterTrackingImpl)part.dataStore().partUpdateCounter();
+ PartitionUpdateCounter cntr0 = part.dataStore().partUpdateCounter();
- AtomicLong cntr = U.field(cntr0, "cntr");
+ assertTrue(cntr0 instanceof PartitionUpdateCounterErrorWrapper);
+
+ PartitionUpdateCounterTrackingImpl delegate = U.field(cntr0, "delegate");
+
+ AtomicLong cntr = U.field(delegate, "cntr");
cntr.set(cntr.get() - 1);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java
index 8e12266..32c7277 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterErrorWrapper;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterVolatileImpl;
import org.apache.ignite.internal.util.typedef.X;
@@ -404,10 +405,14 @@ public class PartitionUpdateCounterTest extends GridCommonAbstractTest {
PartitionUpdateCounter cntr = counter(0, grid0.name());
+ assertTrue(cntr instanceof PartitionUpdateCounterErrorWrapper);
+
+ PartitionUpdateCounter delegate = U.field(cntr, "delegate");
+
if (mode == CacheAtomicityMode.TRANSACTIONAL)
- assertTrue(cntr instanceof PartitionUpdateCounterTrackingImpl);
+ assertTrue(delegate instanceof PartitionUpdateCounterTrackingImpl);
else if (mode == CacheAtomicityMode.ATOMIC)
- assertTrue(cntr instanceof PartitionUpdateCounterVolatileImpl);
+ assertTrue(delegate instanceof PartitionUpdateCounterVolatileImpl);
assertEquals(cntr.initial(), cntr.get());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
index c2ec2a0..15e9a37 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
@@ -26,7 +26,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
@@ -345,7 +345,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest extends TxPa
if (expectAliveNodes == 1) {
IgniteEx node = (IgniteEx)G.allGrids().iterator().next();
- PartitionUpdateCounterTrackingImpl cntr = (PartitionUpdateCounterTrackingImpl)counter(PARTITION_ID, node.name());
+ PartitionUpdateCounter cntr = counter(PARTITION_ID, node.name());
assertTrue(cntr.sequential());
}