You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/22 11:13:50 UTC

[ignite] branch master updated: IGNITE-14565 Added additional update counter logging for detecting of AssertionError: LWM after HWM. Fixes #9011

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 79add25  IGNITE-14565 Added additional update counter logging for detecting of AssertionError: LWM after HWM. Fixes #9011
79add25 is described below

commit 79add251b333ea56a59bd49c688151a36ac3e0e3
Author: Makedonskaya <m....@gmail.com>
AuthorDate: Thu Apr 22 14:13:07 2021 +0300

    IGNITE-14565 Added additional update counter logging for detecting of AssertionError: LWM after HWM. Fixes #9011
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../cache/IgniteCacheOffheapManagerImpl.java       |   2 +-
 .../cache/PartitionUpdateCounterErrorWrapper.java  | 182 +++++++++++++++++++++
 .../cache/PartitionUpdateCounterTrackingImpl.java  |  31 +++-
 ...IgnitePdsSpuriousRebalancingOnNodeJoinTest.java |  10 +-
 .../transactions/PartitionUpdateCounterTest.java   |   9 +-
 ...ounterStateOnePrimaryTwoBackupsFailAllTest.java |   4 +-
 6 files changed, 228 insertions(+), 10 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 71aca1d..82f9a00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1536,7 +1536,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     new PartitionUpdateCounterTrackingImpl(grp);
 
             pCntr = grp.shared().logger(PartitionUpdateCounterDebugWrapper.class).isDebugEnabled() ?
-                new PartitionUpdateCounterDebugWrapper(partId, delegate) : delegate;
+                new PartitionUpdateCounterDebugWrapper(partId, delegate) : new PartitionUpdateCounterErrorWrapper(partId, delegate);
 
             updateValSizeThreshold = grp.shared().database().pageSize() / 2;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
new file mode 100644
index 0000000..b1cf59f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Iterator;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Update counter wrapper for error logging.
+ */
+public class PartitionUpdateCounterErrorWrapper implements PartitionUpdateCounter {
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final int partId;
+
+    /** */
+    private final CacheGroupContext grp;
+
+    /** */
+    private final PartitionUpdateCounter delegate;
+
+    /**
+     * @param partId Part id.
+     * @param delegate Delegate.
+     */
+    public PartitionUpdateCounterErrorWrapper(int partId, PartitionUpdateCounter delegate) {
+        this.partId = partId;
+        this.grp = delegate.context();
+        this.log = grp.shared().logger(getClass());
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long reserve(long delta) {
+        try {
+            return delegate.reserve(delta);
+        } catch (AssertionError e) {
+            SB sb = new SB();
+
+            sb.a("Failed to increment HWM ")
+                .a("[op=reserve")
+                .a(", grpId=").a(grp.groupId())
+                .a(", grpName=").a(grp.cacheOrGroupName())
+                .a(", caches=").a(grp.caches())
+                .a(", atomicity=").a(grp.config().getAtomicityMode())
+                .a(", syncMode=").a(grp.config().getWriteSynchronizationMode())
+                .a(", mode=").a(grp.config().getCacheMode())
+                .a(", partId=").a(partId)
+                .a(", delta=").a(delta)
+                .a("]");
+
+            U.error(log, sb.toString());
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
+        delegate.init(initUpdCntr, cntrUpdData);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long next() {
+        return delegate.next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long next(long delta) {
+        return delegate.next(delta);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void update(long val) throws IgniteCheckedException {
+        delegate.update(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean update(long start, long delta) {
+        return delegate.update(start, delta);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        delegate.reset();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updateInitial(long start, long delta) {
+        delegate.updateInitial(start, delta);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridLongList finalizeUpdateCounters() {
+        return delegate.finalizeUpdateCounters();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetInitialCounter() {
+        delegate.resetInitialCounter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long initial() {
+        return delegate.initial();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long get() {
+        return delegate.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (o instanceof PartitionUpdateCounterErrorWrapper)
+            return delegate.equals(((PartitionUpdateCounterErrorWrapper)o).delegate);
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long reserved() {
+        return delegate.reserved();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public byte[] getBytes() {
+        return delegate.getBytes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean sequential() {
+        return delegate.sequential();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean empty() {
+        return delegate.empty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<long[]> iterator() {
+        return delegate.iterator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheGroupContext context() {
+        return delegate.context();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PartitionUpdateCounter copy() {
+        return new PartitionUpdateCounterErrorWrapper(partId, delegate.copy());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return delegate.toString();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index 0f9d72c..f4e7a06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -259,7 +260,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
 
         long reserved = reserveCntr.getAndAdd(delta);
 
-        assert reserved >= cntr : "LWM after HWM: lwm=" + cntr + ", hwm=" + reserved;
+        assert reserved >= cntr : "LWM after HWM: lwm=" + cntr + ", hwm=" + reserved + ", cntr=" + toString();
 
         return reserved;
     }
@@ -446,8 +447,32 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "Counter [lwm=" + get() + ", holes=" + queue +
-            ", maxApplied=" + highestAppliedCounter() + ", hwm=" + reserveCntr.get() + ']';
+        String quequeStr;
+        long lwm;
+        long hwm;
+        long maxApplied;
+
+        synchronized (this) {
+            quequeStr = queue.toString();
+
+            lwm = get();
+
+            hwm = reserveCntr.get();
+
+            maxApplied = highestAppliedCounter();
+        }
+
+        return new SB()
+            .a("Counter [lwm=")
+            .a(lwm)
+            .a(", holes=")
+            .a(quequeStr)
+            .a(", maxApplied=")
+            .a(maxApplied)
+            .a(", hwm=")
+            .a(hwm)
+            .a(']')
+            .toString();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
index 564a728..960fa7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
@@ -31,6 +31,8 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterErrorWrapper;
 import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
@@ -146,9 +148,13 @@ public class IgnitePdsSpuriousRebalancingOnNodeJoinTest extends GridCommonAbstra
 
             assertNotNull(part);
 
-            PartitionUpdateCounterTrackingImpl cntr0 = (PartitionUpdateCounterTrackingImpl)part.dataStore().partUpdateCounter();
+            PartitionUpdateCounter cntr0 = part.dataStore().partUpdateCounter();
 
-            AtomicLong cntr = U.field(cntr0, "cntr");
+            assertTrue(cntr0 instanceof PartitionUpdateCounterErrorWrapper);
+
+            PartitionUpdateCounterTrackingImpl delegate = U.field(cntr0, "delegate");
+
+            AtomicLong cntr = U.field(delegate, "cntr");
 
             cntr.set(cntr.get() - 1);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java
index 8e12266..32c7277 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/PartitionUpdateCounterTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterErrorWrapper;
 import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
 import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterVolatileImpl;
 import org.apache.ignite.internal.util.typedef.X;
@@ -404,10 +405,14 @@ public class PartitionUpdateCounterTest extends GridCommonAbstractTest {
 
             PartitionUpdateCounter cntr = counter(0, grid0.name());
 
+            assertTrue(cntr instanceof PartitionUpdateCounterErrorWrapper);
+
+            PartitionUpdateCounter delegate = U.field(cntr, "delegate");
+
             if (mode == CacheAtomicityMode.TRANSACTIONAL)
-                assertTrue(cntr instanceof PartitionUpdateCounterTrackingImpl);
+                assertTrue(delegate instanceof PartitionUpdateCounterTrackingImpl);
             else if (mode == CacheAtomicityMode.ATOMIC)
-                assertTrue(cntr instanceof PartitionUpdateCounterVolatileImpl);
+                assertTrue(delegate instanceof PartitionUpdateCounterVolatileImpl);
 
             assertEquals(cntr.initial(), cntr.get());
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
index c2ec2a0..15e9a37 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
@@ -26,7 +26,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
@@ -345,7 +345,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest extends TxPa
         if (expectAliveNodes == 1) {
             IgniteEx node = (IgniteEx)G.allGrids().iterator().next();
 
-            PartitionUpdateCounterTrackingImpl cntr = (PartitionUpdateCounterTrackingImpl)counter(PARTITION_ID, node.name());
+            PartitionUpdateCounter cntr = counter(PARTITION_ID, node.name());
 
             assertTrue(cntr.sequential());
         }