You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/02/27 18:46:53 UTC

[ignite] branch master updated: IGNITE-16621 Fixed an issue with IgniteAtomicSequence that led to AssertionError. Fixes #9847

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d900d1  IGNITE-16621 Fixed an issue with IgniteAtomicSequence that led to AssertionError. Fixes #9847
2d900d1 is described below

commit 2d900d1b56909b5ec98e2a9b65237d0e69c5da62
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Sun Feb 27 21:46:12 2022 +0300

    IGNITE-16621 Fixed an issue with IgniteAtomicSequence that led to AssertionError. Fixes #9847
---
 .../datastructures/AtomicDataStructureProxy.java   |   8 +-
 .../GridCacheAtomicSequenceImpl.java               |  54 +++----
 .../internal/IgniteClientReconnectAtomicsTest.java |  17 +-
 ...ientReconnectAtomicsWithLostPartitionsTest.java | 171 +++++++++++++++------
 .../GridCacheSequenceApiSelfAbstractTest.java      |   3 +-
 5 files changed, 161 insertions(+), 92 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java
index 62bed72..145768a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java
@@ -114,9 +114,9 @@ public abstract class AtomicDataStructureProxy<V extends AtomicDataStructureValu
     /**
      * Check removed status.
      *
-     * @throws IllegalStateException If removed.
+     * @throws IgniteException If removed.
      */
-    protected void checkRemoved() throws IllegalStateException {
+    protected void checkRemoved() throws IgniteException {
         if (rmvd)
             throw removedError();
 
@@ -175,8 +175,8 @@ public abstract class AtomicDataStructureProxy<V extends AtomicDataStructureValu
     /**
      * @return Error.
      */
-    private IllegalStateException removedError() {
-        return new IllegalStateException("Sequence was removed from cache: " + name);
+    private IgniteException removedError() {
+        return new IgniteException("Sequence was removed from cache: " + name);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 43b53c6..55f7878 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -26,6 +26,7 @@ import java.io.ObjectStreamException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.processor.EntryProcessorException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridKernalContext;
@@ -132,46 +133,26 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
 
     /** {@inheritDoc} */
     @Override public long incrementAndGet() {
-        try {
-            return internalUpdate(1, incAndGetCall, true);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
+        return internalUpdate(1, incAndGetCall, true);
     }
 
     /** {@inheritDoc} */
     @Override public long getAndIncrement() {
-        try {
-            return internalUpdate(1, getAndIncCall, false);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
+        return internalUpdate(1, getAndIncCall, false);
     }
 
     /** {@inheritDoc} */
     @Override public long addAndGet(long l) {
         A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
 
-        try {
-            return internalUpdate(l, null, true);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
+        return internalUpdate(l, null, true);
     }
 
     /** {@inheritDoc} */
     @Override public long getAndAdd(long l) {
         A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
 
-        try {
-            return internalUpdate(l, null, false);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
+        return internalUpdate(l, null, false);
     }
 
     /**
@@ -182,17 +163,22 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
      * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
      *      prior to update.
      * @return Sequence value.
-     * @throws IgniteCheckedException If update failed.
+     * @throws IgniteException If update failed.
      */
-    private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException {
+    private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteException {
         checkRemoved();
 
         assert l > 0;
 
         if (ctx.shared().readOnlyMode()) {
-            throw new CacheInvalidStateException(new IgniteClusterReadOnlyException(
-                String.format(CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT, "sequence", ctx.group().name(), ctx.name())
-            ));
+            throw U.convertException(
+                new CacheInvalidStateException(
+                    new IgniteClusterReadOnlyException(
+                        String.format(CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT,
+                        "sequence",
+                        ctx.group().name(),
+                        ctx.name())
+            )));
         }
 
         localUpdate.lock();
@@ -230,11 +216,8 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
             try {
                 return CU.retryTopologySafe(updateCall);
             }
-            catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
-                throw e;
-            }
             catch (Exception e) {
-                throw new IgniteCheckedException(e);
+                throw checkRemovedAfterFail(e);
             }
         }
         finally {
@@ -305,7 +288,10 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
 
                     checkRemoved();
 
-                    assert seq != null;
+                    if (seq == null) {
+                        // This is the case when partition is lost and partition loss policy is IGNORE.
+                        throw new EntryProcessorException("Failed to find atomic sequence with the given name: " + key.name());
+                    }
 
                     long curLocVal;
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 5dbf9bf..f1cc274 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
@@ -78,7 +79,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IllegalStateException.class, null);
+        }, IgniteException.class, null);
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -86,7 +87,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IllegalStateException.class, null);
+        }, IgniteException.class, null);
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -94,7 +95,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IllegalStateException.class, null);
+        }, IgniteException.class, null);
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -102,7 +103,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IllegalStateException.class, null);
+        }, IgniteException.class, null);
     }
 
     /**
@@ -177,7 +178,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IllegalStateException.class, null);
+        }, IgniteException.class, null);
 
         IgniteAtomicSequence newClientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
 
@@ -328,7 +329,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IllegalStateException.class, null);
+        }, IgniteException.class, null);
 
         IgniteAtomicReference<String> newClientAtomicRef =
             client.atomicReference("atomicRefRemoved", "1st value", true);
@@ -491,7 +492,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IllegalStateException.class, null);
+        }, IgniteException.class, null);
 
         IgniteAtomicStamped newClientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
 
@@ -640,7 +641,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IllegalStateException.class, null);
+        }, IgniteException.class, null);
 
         IgniteAtomicLong newClientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsWithLostPartitionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsWithLostPartitionsTest.java
index 72ce8ac..9f67d84 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsWithLostPartitionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsWithLostPartitionsTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicReference;
+import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -222,15 +223,59 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAtomicSequenceAddAngGet() throws Exception {
+        testAtomicSequenceReconnectClusterRestart("atomic-sequence-addAndGet", atomic -> atomic.addAndGet(5L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAtomicSequenceGetAngAdd() throws Exception {
+        testAtomicSequenceReconnectClusterRestart("atomic-sequence-getAndAdd", atomic -> atomic.getAndAdd(5L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAtomicSequenceIncrementAndGet() throws Exception {
+        testAtomicSequenceReconnectClusterRestart(
+            "atomic-sequence-incrementAndGet",
+            atomic -> {
+                // Need to execute twice at least. See AtomicConfiguration.setAtomicSequenceReserveSize.
+                atomic.incrementAndGet();
+                atomic.incrementAndGet();
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAtomicSequenceGetAndIncrement() throws Exception {
+        testAtomicSequenceReconnectClusterRestart(
+            "atomic-sequence-getAndIncrement",
+            atomic -> {
+                // Need to execute twice at least. See AtomicConfiguration.setAtomicSequenceReserveSize.
+                atomic.getAndIncrement();
+                atomic.getAndIncrement();
+            });
+    }
+
+    /**
      * Tests atomic long operation provided by the the given {@code clo}.
      *
      * @param atomicName Name of atomic long.
-     * @param clo Closure that represents an operation.
+     * @param op Closure that represents an operation.
      * @throws Exception If failed.
      */
     private void testAtomicLongReconnectClusterRestart(
         String atomicName,
-        final IgniteInClosure<IgniteAtomicLong> clo
+        final IgniteInClosure<IgniteAtomicLong> op
     ) throws Exception {
         Ignite client = grid(serverCount());
 
@@ -244,21 +289,9 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
 
         // Restart the cluster without waiting for rebalancing.
         // It should lead to data loss because there are no backups in the atomic configuration.
-        for (int i = 0; i < serverCount(); ++i) {
-            grid(i).close();
+        restartClusterWithoutRebalancing();
 
-            startGrid(i);
-        }
-
-        GridTestUtils.assertThrows(
-            log,
-            () -> {
-                clo.apply(atomic);
-
-                return null;
-            },
-            IgniteException.class,
-            "Failed to find atomic long: " + atomicName);
+        checkAtomicOperation(atomic, op, "Failed to find atomic long: " + atomicName);
 
         assertTrue("Atomic long instance should be removed.", atomic.removed());
 
@@ -271,12 +304,12 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
      * Tests atomic reference operation provided by the the given {@code clo}.
      *
      * @param atomicName Name of atomic.
-     * @param clo Closure that represents an operation.
+     * @param op Closure that represents an operation.
      * @throws Exception If failed.
      */
     private void testAtomicReferenceReconnectClusterRestart(
         String atomicName,
-        final IgniteInClosure<IgniteAtomicReference<Long>> clo
+        final IgniteInClosure<IgniteAtomicReference<Long>> op
     ) throws Exception {
         Ignite client = grid(serverCount());
 
@@ -290,21 +323,9 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
 
         // Restart the cluster without waiting for rebalancing.
         // It should lead to data loss because there are no backups in the atomic configuration.
-        for (int i = 0; i < serverCount(); ++i) {
-            grid(i).close();
-
-            startGrid(i);
-        }
+        restartClusterWithoutRebalancing();
 
-        GridTestUtils.assertThrows(
-            log,
-            () -> {
-                clo.apply(atomic);
-
-                return null;
-            },
-            IgniteException.class,
-            "Failed to find atomic reference with given name: " + atomicName);
+        checkAtomicOperation(atomic, op, "Failed to find atomic reference with given name: " + atomicName);
 
         assertTrue("Atomic instance should be removed.", atomic.removed());
 
@@ -340,6 +361,67 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
         assertEquals(initVal, atomic.get().get1());
         assertEquals(initStamp, atomic.get().get2());
 
+        // It should lead to data loss because there are no backups in the atomic configuration.
+        restartClusterWithoutRebalancing();
+
+        checkAtomicOperation(atomic, op, "Failed to find atomic stamped with given name: " + atomicName);
+
+        assertTrue("Atomic instance should be removed.", atomic.removed());
+
+        IgniteAtomicStamped<String, String> recreatedAtomic = client.atomicStamped(atomicName, initVal, initStamp, true);
+
+        assertNotNull(recreatedAtomic);
+
+        assertEquals(initVal, recreatedAtomic.value());
+        assertEquals(initStamp, recreatedAtomic.stamp());
+        assertEquals(initVal, recreatedAtomic.get().get1());
+        assertEquals(initStamp, recreatedAtomic.get().get2());
+    }
+
+    /**
+     * Tests atomic sequence operation provided by the the given {@code clo}.
+     *
+     * @param atomicName Name of atomic sequnce.
+     * @param op Closure that represents an operation.
+     * @throws Exception If failed.
+     */
+    private void testAtomicSequenceReconnectClusterRestart(
+        String atomicName,
+        final IgniteInClosure<IgniteAtomicSequence> op
+    ) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        AtomicConfiguration atomicCfg = new AtomicConfiguration()
+            .setBackups(0)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+            .setAtomicSequenceReserveSize(1);
+
+        final IgniteAtomicSequence atomic = client.atomicSequence(atomicName, atomicCfg, 1L, true);
+
+        assertNotNull(atomic);
+
+        assertEquals("Unexpected initial value.", 1L, atomic.get());
+
+        // It should lead to data loss because there are no backups in the atomic configuration.
+        restartClusterWithoutRebalancing();
+
+        checkAtomicOperation(atomic, op, "Failed to find atomic sequence with the given name: " + atomicName);
+
+        assertTrue("Atomic sequnce instance should be removed.", atomic.removed());
+
+        IgniteAtomicSequence recreatedAtomicLong = client.atomicSequence(atomicName, atomicCfg, 100L, true);
+
+        assertEquals("Unexpected initial value.", 100L, recreatedAtomicLong.get());
+    }
+
+    /**
+     * Restarts the cluster without waiting for rebalancing.
+     *
+     * @throws Exception If failed.
+     */
+    private void restartClusterWithoutRebalancing() throws Exception {
         // Restart the cluster without waiting for rebalancing.
         // It should lead to data loss because there are no backups in the atomic configuration.
         for (int i = 0; i < serverCount(); ++i) {
@@ -347,26 +429,25 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
 
             startGrid(i);
         }
+    }
 
+    /**
+     * Checks that the operation that is represented by the given {@code clo} throws {@link IgniteException}.
+     *
+     * @param atomic Atomic data structure to be tested.
+     * @param clo Represent concrete operation.
+     * @param expMsg Expected exception message.
+     * @param <T> Type of atomic data structure.
+     */
+    private <T> void checkAtomicOperation(T atomic, IgniteInClosure<T> clo, String expMsg) {
         GridTestUtils.assertThrows(
             log,
             () -> {
-                op.apply(atomic);
+                clo.apply(atomic);
 
                 return null;
             },
             IgniteException.class,
-            "Failed to find atomic stamped with given name: " + atomicName);
-
-        assertTrue("Atomic instance should be removed.", atomic.removed());
-
-        IgniteAtomicStamped<String, String> recreatedAtomic = client.atomicStamped(atomicName, initVal, initStamp, true);
-
-        assertNotNull(recreatedAtomic);
-
-        assertEquals(initVal, recreatedAtomic.value());
-        assertEquals(initStamp, recreatedAtomic.stamp());
-        assertEquals(initVal, recreatedAtomic.get().get1());
-        assertEquals(initStamp, recreatedAtomic.get().get2());
+            expMsg);
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
index 5c4d5d6..0e8440a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -338,7 +339,7 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
 
             fail("Exception expected.");
         }
-        catch (IllegalStateException e) {
+        catch (IgniteException e) {
             info("Caught expected exception: " + e);
         }
     }