You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/02/27 18:46:53 UTC
[ignite] branch master updated: IGNITE-16621 Fixed an issue with IgniteAtomicSequence that led to AssertionError. Fixes #9847
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2d900d1 IGNITE-16621 Fixed an issue with IgniteAtomicSequence that led to AssertionError. Fixes #9847
2d900d1 is described below
commit 2d900d1b56909b5ec98e2a9b65237d0e69c5da62
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Sun Feb 27 21:46:12 2022 +0300
IGNITE-16621 Fixed an issue with IgniteAtomicSequence that led to AssertionError. Fixes #9847
---
.../datastructures/AtomicDataStructureProxy.java | 8 +-
.../GridCacheAtomicSequenceImpl.java | 54 +++----
.../internal/IgniteClientReconnectAtomicsTest.java | 17 +-
...ientReconnectAtomicsWithLostPartitionsTest.java | 171 +++++++++++++++------
.../GridCacheSequenceApiSelfAbstractTest.java | 3 +-
5 files changed, 161 insertions(+), 92 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java
index 62bed72..145768a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java
@@ -114,9 +114,9 @@ public abstract class AtomicDataStructureProxy<V extends AtomicDataStructureValu
/**
* Check removed status.
*
- * @throws IllegalStateException If removed.
+ * @throws IgniteException If removed.
*/
- protected void checkRemoved() throws IllegalStateException {
+ protected void checkRemoved() throws IgniteException {
if (rmvd)
throw removedError();
@@ -175,8 +175,8 @@ public abstract class AtomicDataStructureProxy<V extends AtomicDataStructureValu
/**
* @return Error.
*/
- private IllegalStateException removedError() {
- return new IllegalStateException("Sequence was removed from cache: " + name);
+ private IgniteException removedError() {
+ return new IgniteException("Sequence was removed from cache: " + name);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 43b53c6..55f7878 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -26,6 +26,7 @@ import java.io.ObjectStreamException;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.processor.EntryProcessorException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
@@ -132,46 +133,26 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
/** {@inheritDoc} */
@Override public long incrementAndGet() {
- try {
- return internalUpdate(1, incAndGetCall, true);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
+ return internalUpdate(1, incAndGetCall, true);
}
/** {@inheritDoc} */
@Override public long getAndIncrement() {
- try {
- return internalUpdate(1, getAndIncCall, false);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
+ return internalUpdate(1, getAndIncCall, false);
}
/** {@inheritDoc} */
@Override public long addAndGet(long l) {
A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
- try {
- return internalUpdate(l, null, true);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
+ return internalUpdate(l, null, true);
}
/** {@inheritDoc} */
@Override public long getAndAdd(long l) {
A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
- try {
- return internalUpdate(l, null, false);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
+ return internalUpdate(l, null, false);
}
/**
@@ -182,17 +163,22 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
* @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
* prior to update.
* @return Sequence value.
- * @throws IgniteCheckedException If update failed.
+ * @throws IgniteException If update failed.
*/
- private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException {
+ private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteException {
checkRemoved();
assert l > 0;
if (ctx.shared().readOnlyMode()) {
- throw new CacheInvalidStateException(new IgniteClusterReadOnlyException(
- String.format(CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT, "sequence", ctx.group().name(), ctx.name())
- ));
+ throw U.convertException(
+ new CacheInvalidStateException(
+ new IgniteClusterReadOnlyException(
+ String.format(CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT,
+ "sequence",
+ ctx.group().name(),
+ ctx.name())
+ )));
}
localUpdate.lock();
@@ -230,11 +216,8 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
try {
return CU.retryTopologySafe(updateCall);
}
- catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
- throw e;
- }
catch (Exception e) {
- throw new IgniteCheckedException(e);
+ throw checkRemovedAfterFail(e);
}
}
finally {
@@ -305,7 +288,10 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
checkRemoved();
- assert seq != null;
+ if (seq == null) {
+ // This is the case when partition is lost and partition loss policy is IGNORE.
+ throw new EntryProcessorException("Failed to find atomic sequence with the given name: " + key.name());
+ }
long curLocVal;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 5dbf9bf..f1cc274 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
@@ -78,7 +79,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
return null;
}
- }, IllegalStateException.class, null);
+ }, IgniteException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -86,7 +87,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
return null;
}
- }, IllegalStateException.class, null);
+ }, IgniteException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -94,7 +95,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
return null;
}
- }, IllegalStateException.class, null);
+ }, IgniteException.class, null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -102,7 +103,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
return null;
}
- }, IllegalStateException.class, null);
+ }, IgniteException.class, null);
}
/**
@@ -177,7 +178,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
return null;
}
- }, IllegalStateException.class, null);
+ }, IgniteException.class, null);
IgniteAtomicSequence newClientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
@@ -328,7 +329,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
return null;
}
- }, IllegalStateException.class, null);
+ }, IgniteException.class, null);
IgniteAtomicReference<String> newClientAtomicRef =
client.atomicReference("atomicRefRemoved", "1st value", true);
@@ -491,7 +492,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
return null;
}
- }, IllegalStateException.class, null);
+ }, IgniteException.class, null);
IgniteAtomicStamped newClientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
@@ -640,7 +641,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
return null;
}
- }, IllegalStateException.class, null);
+ }, IgniteException.class, null);
IgniteAtomicLong newClientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsWithLostPartitionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsWithLostPartitionsTest.java
index 72ce8ac..9f67d84 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsWithLostPartitionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsWithLostPartitionsTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicReference;
+import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -222,15 +223,59 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
}
/**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAtomicSequenceAddAngGet() throws Exception {
+ testAtomicSequenceReconnectClusterRestart("atomic-sequence-addAndGet", atomic -> atomic.addAndGet(5L));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAtomicSequenceGetAngAdd() throws Exception {
+ testAtomicSequenceReconnectClusterRestart("atomic-sequence-getAndAdd", atomic -> atomic.getAndAdd(5L));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAtomicSequenceIncrementAndGet() throws Exception {
+ testAtomicSequenceReconnectClusterRestart(
+ "atomic-sequence-incrementAndGet",
+ atomic -> {
+ // Need to execute twice at least. See AtomicConfiguration.setAtomicSequenceReserveSize.
+ atomic.incrementAndGet();
+ atomic.incrementAndGet();
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAtomicSequenceGetAndIncrement() throws Exception {
+ testAtomicSequenceReconnectClusterRestart(
+ "atomic-sequence-getAndIncrement",
+ atomic -> {
+ // Need to execute twice at least. See AtomicConfiguration.setAtomicSequenceReserveSize.
+ atomic.getAndIncrement();
+ atomic.getAndIncrement();
+ });
+ }
+
+ /**
* Tests atomic long operation provided by the the given {@code clo}.
*
* @param atomicName Name of atomic long.
- * @param clo Closure that represents an operation.
+ * @param op Closure that represents an operation.
* @throws Exception If failed.
*/
private void testAtomicLongReconnectClusterRestart(
String atomicName,
- final IgniteInClosure<IgniteAtomicLong> clo
+ final IgniteInClosure<IgniteAtomicLong> op
) throws Exception {
Ignite client = grid(serverCount());
@@ -244,21 +289,9 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
// Restart the cluster without waiting for rebalancing.
// It should lead to data loss because there are no backups in the atomic configuration.
- for (int i = 0; i < serverCount(); ++i) {
- grid(i).close();
+ restartClusterWithoutRebalancing();
- startGrid(i);
- }
-
- GridTestUtils.assertThrows(
- log,
- () -> {
- clo.apply(atomic);
-
- return null;
- },
- IgniteException.class,
- "Failed to find atomic long: " + atomicName);
+ checkAtomicOperation(atomic, op, "Failed to find atomic long: " + atomicName);
assertTrue("Atomic long instance should be removed.", atomic.removed());
@@ -271,12 +304,12 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
* Tests atomic reference operation provided by the the given {@code clo}.
*
* @param atomicName Name of atomic.
- * @param clo Closure that represents an operation.
+ * @param op Closure that represents an operation.
* @throws Exception If failed.
*/
private void testAtomicReferenceReconnectClusterRestart(
String atomicName,
- final IgniteInClosure<IgniteAtomicReference<Long>> clo
+ final IgniteInClosure<IgniteAtomicReference<Long>> op
) throws Exception {
Ignite client = grid(serverCount());
@@ -290,21 +323,9 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
// Restart the cluster without waiting for rebalancing.
// It should lead to data loss because there are no backups in the atomic configuration.
- for (int i = 0; i < serverCount(); ++i) {
- grid(i).close();
-
- startGrid(i);
- }
+ restartClusterWithoutRebalancing();
- GridTestUtils.assertThrows(
- log,
- () -> {
- clo.apply(atomic);
-
- return null;
- },
- IgniteException.class,
- "Failed to find atomic reference with given name: " + atomicName);
+ checkAtomicOperation(atomic, op, "Failed to find atomic reference with given name: " + atomicName);
assertTrue("Atomic instance should be removed.", atomic.removed());
@@ -340,6 +361,67 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
assertEquals(initVal, atomic.get().get1());
assertEquals(initStamp, atomic.get().get2());
+ // It should lead to data loss because there are no backups in the atomic configuration.
+ restartClusterWithoutRebalancing();
+
+ checkAtomicOperation(atomic, op, "Failed to find atomic stamped with given name: " + atomicName);
+
+ assertTrue("Atomic instance should be removed.", atomic.removed());
+
+ IgniteAtomicStamped<String, String> recreatedAtomic = client.atomicStamped(atomicName, initVal, initStamp, true);
+
+ assertNotNull(recreatedAtomic);
+
+ assertEquals(initVal, recreatedAtomic.value());
+ assertEquals(initStamp, recreatedAtomic.stamp());
+ assertEquals(initVal, recreatedAtomic.get().get1());
+ assertEquals(initStamp, recreatedAtomic.get().get2());
+ }
+
+ /**
+ * Tests atomic sequence operation provided by the the given {@code clo}.
+ *
+ * @param atomicName Name of atomic sequnce.
+ * @param op Closure that represents an operation.
+ * @throws Exception If failed.
+ */
+ private void testAtomicSequenceReconnectClusterRestart(
+ String atomicName,
+ final IgniteInClosure<IgniteAtomicSequence> op
+ ) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ AtomicConfiguration atomicCfg = new AtomicConfiguration()
+ .setBackups(0)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ .setAtomicSequenceReserveSize(1);
+
+ final IgniteAtomicSequence atomic = client.atomicSequence(atomicName, atomicCfg, 1L, true);
+
+ assertNotNull(atomic);
+
+ assertEquals("Unexpected initial value.", 1L, atomic.get());
+
+ // It should lead to data loss because there are no backups in the atomic configuration.
+ restartClusterWithoutRebalancing();
+
+ checkAtomicOperation(atomic, op, "Failed to find atomic sequence with the given name: " + atomicName);
+
+ assertTrue("Atomic sequnce instance should be removed.", atomic.removed());
+
+ IgniteAtomicSequence recreatedAtomicLong = client.atomicSequence(atomicName, atomicCfg, 100L, true);
+
+ assertEquals("Unexpected initial value.", 100L, recreatedAtomicLong.get());
+ }
+
+ /**
+ * Restarts the cluster without waiting for rebalancing.
+ *
+ * @throws Exception If failed.
+ */
+ private void restartClusterWithoutRebalancing() throws Exception {
// Restart the cluster without waiting for rebalancing.
// It should lead to data loss because there are no backups in the atomic configuration.
for (int i = 0; i < serverCount(); ++i) {
@@ -347,26 +429,25 @@ public class IgniteClientReconnectAtomicsWithLostPartitionsTest extends IgniteCl
startGrid(i);
}
+ }
+ /**
+ * Checks that the operation that is represented by the given {@code clo} throws {@link IgniteException}.
+ *
+ * @param atomic Atomic data structure to be tested.
+ * @param clo Represent concrete operation.
+ * @param expMsg Expected exception message.
+ * @param <T> Type of atomic data structure.
+ */
+ private <T> void checkAtomicOperation(T atomic, IgniteInClosure<T> clo, String expMsg) {
GridTestUtils.assertThrows(
log,
() -> {
- op.apply(atomic);
+ clo.apply(atomic);
return null;
},
IgniteException.class,
- "Failed to find atomic stamped with given name: " + atomicName);
-
- assertTrue("Atomic instance should be removed.", atomic.removed());
-
- IgniteAtomicStamped<String, String> recreatedAtomic = client.atomicStamped(atomicName, initVal, initStamp, true);
-
- assertNotNull(recreatedAtomic);
-
- assertEquals(initVal, recreatedAtomic.value());
- assertEquals(initStamp, recreatedAtomic.stamp());
- assertEquals(initVal, recreatedAtomic.get().get1());
- assertEquals(initStamp, recreatedAtomic.get().get2());
+ expMsg);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
index 5c4d5d6..0e8440a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -338,7 +339,7 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics
fail("Exception expected.");
}
- catch (IllegalStateException e) {
+ catch (IgniteException e) {
info("Caught expected exception: " + e);
}
}