You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/01/29 19:03:25 UTC
[ignite] branch master updated: IGNITE-14703 Fixed transactions
failover. Fixes #8712
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9af1eb4 IGNITE-14703 Fixed transactions failover. Fixes #8712
9af1eb4 is described below
commit 9af1eb4bf9b3425232a6b9a5109af35077e8548d
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Jan 29 22:02:44 2021 +0300
IGNITE-14703 Fixed transactions failover. Fixes #8712
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../distributed/near/GridNearTxFinishFuture.java | 2 +-
.../near/IgniteTxExceptionNodeFailTest.java | 239 ++++++++++++++-------
2 files changed, 163 insertions(+), 78 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 8a25f86..7a4cf40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -990,7 +990,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
/** {@inheritDoc} */
@Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
if (tx.state() == COMMITTING || tx.state() == COMMITTED) {
- if (concat(of(m.primary().id()), tx.transactionNodes().getOrDefault(nodeId, emptySet()).stream())
+ if (concat(of(m.primary().id()), tx.transactionNodes().getOrDefault(m.primary().id(), emptySet()).stream())
.noneMatch(uuid -> cctx.discovery().alive(uuid))) {
onDone(new CacheInvalidStateException(ALL_PARTITION_OWNERS_LEFT_GRID_MSG +
m.entries().stream().map(e -> " [cacheName=" + e.cached().context().name() +
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java
index f16c6ea..64a043a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java
@@ -20,66 +20,78 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.ignite.ShutdownPolicy;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionHeuristicException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.springframework.util.Assert;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.ALL_PARTITION_OWNERS_LEFT_GRID_MSG;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
/**
- * Tests check a result of commit when a node fail before
- * send {@link GridNearTxFinishResponse} to transaction coodinator
+ * Tests check a result of commit when a node fail before send {@link GridNearTxFinishResponse} to transaction
+ * coordinator
*/
@RunWith(Parameterized.class)
public class IgniteTxExceptionNodeFailTest extends GridCommonAbstractTest {
+ /** Client node name. */
+ private static final String CLIENT = "client";
+
+ /** Node leave events for discovery event listener. */
+ private static final int[] TYPES = {EVT_NODE_LEFT, EVT_NODE_FAILED};
+
/** Parameters. */
@Parameterized.Parameters(name = "syncMode={0}")
- public static Iterable<Object[]> data() {
- return Arrays.asList(new Object[][] {
- { PRIMARY_SYNC },
- { FULL_SYNC },
- });
+ public static Iterable<CacheWriteSynchronizationMode> data() {
+ return Arrays.asList(PRIMARY_SYNC, FULL_SYNC);
}
/** syncMode */
@Parameterized.Parameter()
public CacheWriteSynchronizationMode syncMode;
+ /** Amount backups for cache. */
+ public int backups = 0;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- DataStorageConfiguration dsConfig = new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(100L * 1024 * 1024)
- .setPersistenceEnabled(true));
-
- cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
-
- return cfg
- .setDataStorageConfiguration(dsConfig)
- .setCacheConfiguration(new CacheConfiguration("cache")
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setMaxSize(100L * 1024 * 1024)
+ .setPersistenceEnabled(true)))
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
- .setWriteSynchronizationMode(syncMode).setBackups(0));
+ .setWriteSynchronizationMode(syncMode)
+ .setBackups(backups));
}
/** {@inheritDoc} */
@@ -113,85 +125,158 @@ public class IgniteTxExceptionNodeFailTest extends GridCommonAbstractTest {
*/
@Test
public void testNodeFailBeforeSendGridNearTxFinishResponse() throws Exception {
- startGrids(2);
+ IgniteEx grid0 = startGrids(2);
- grid(0).cluster().active(true);
+ grid0.cluster().state(ClusterState.ACTIVE);
- IgniteEx grid0 = grid(0);
IgniteEx grid1 = grid(1);
- int key0 = 0;
- int key1 = 0;
+ int key0 = primaryKey(grid0.cache(DEFAULT_CACHE_NAME));
+ int key1 = primaryKey(grid1.cache(DEFAULT_CACHE_NAME));
- Affinity<Object> aff = grid1.affinity("cache");
+ Affinity<Object> aff = grid1.affinity(DEFAULT_CACHE_NAME);
- for (int i = 1; i < 1000; i++) {
- if (grid0.equals(grid(aff.mapKeyToNode(i)))) {
- key0 = i;
+ assertFalse(
+ "Keys have the same mapping [key0=" + key0 + ", key1=" + key1 + ']',
+ aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(key1))
+ );
+
+ spi(grid0).blockMessages(GridNearTxFinishResponse.class, getTestIgniteInstanceName(1));
+
+ IgniteInternalFuture stopNodeFut = GridTestUtils.runAsync(() -> {
+ try {
+ spi(grid0).waitForBlocked();
+ }
+ catch (InterruptedException e) {
+ log.error("Waiting is interrupted.", e);
+ }
+
+ info("Stopping node: [" + grid0.name() + ']');
+
+ grid0.close();
+
+ },
+ "node-stopper"
+ );
+
+ try (Transaction tx = grid1.transactions().txStart()) {
+ grid1.cache(DEFAULT_CACHE_NAME).put(key0, 100);
+ grid1.cache(DEFAULT_CACHE_NAME).put(key1, 200);
+
+ tx.commit();
+
+ fail("Transaction passed, but no one partition is alive.");
- break;
- }
}
+ catch (Exception e) {
+ assertTrue(X.hasCause(e, CacheInvalidStateException.class));
+
+ String msg = e.getMessage();
+
+ assertTrue(msg.contains(ALL_PARTITION_OWNERS_LEFT_GRID_MSG));
+
+ if (!mvccEnabled(grid1.context())) {
+ Pattern msgPtrn;
- for (int i = key0; i < 1000; i++) {
- if (grid1.equals(grid(aff.mapKeyToNode(i))) && !aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(i))) {
- key1 = i;
+ msgPtrn = Pattern.compile(" \\[cacheName=" + DEFAULT_CACHE_NAME +
+ ", partition=\\d+, " +
+ "key=KeyCacheObjectImpl \\[part=\\d+, val=" + key0 +
+ ", hasValBytes=true\\]\\]");
- break;
+ Matcher matcher = msgPtrn.matcher(msg);
+
+ assertTrue("Message does not match: [msg=" + msg + ']', matcher.find());
}
}
- assert !aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(key1));
+ stopNodeFut.get(10_000);
+ }
- try (Transaction tx = grid1.transactions().txStart()) {
- grid1.cache("cache").put(key0, 100);
- grid1.cache("cache").put(key1, 200);
-
- spi(grid0).blockMessages((node, msg) -> {
- if (msg instanceof GridNearTxFinishResponse) {
- new Thread(
- new Runnable() {
- @Override public void run() {
- log().info("Stopping node: [" + grid0.name() + "]");
-
- IgnitionEx.stop(grid0.name(), true, ShutdownPolicy.IMMEDIATE, false);
- }
- },
- "node-stopper"
- ).start();
-
- return true;
- }
-
- return false;
- }
- );
+ /**
+ * Test checks the all node leave detector when cache has backups enough.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void cacheWithBackups() throws Exception {
+ backups = 2;
- boolean passed = false;
+ IgniteEx ignite0 = startGrids(3);
- try {
- tx.commit();
- }
- catch (Throwable e) {
- String msg = e.getMessage();
+ ignite0.cluster().state(ClusterState.ACTIVE);
+
+ IgniteEx client = startClientGrid(CLIENT);
- Assert.isTrue(e.getCause() instanceof CacheInvalidStateException);
+ awaitPartitionMapExchange();
- Assert.isTrue(msg.contains(ALL_PARTITION_OWNERS_LEFT_GRID_MSG));
+ int key = primaryKey(ignite(1).cache(DEFAULT_CACHE_NAME));
- if (!mvccEnabled(grid1.context())) {
- Pattern msgPtrn = Pattern.compile(" \\[cacheName=cache, partition=\\d+, " + "key=KeyCacheObjectImpl \\[part=\\d+, val=" + key0 +
- ", hasValBytes=true\\]\\]");
+ spi(ignite(1)).blockMessages(GridNearTxFinishResponse.class, CLIENT);
- Matcher matcher = msgPtrn.matcher(msg);
+ spi(ignite(2)).blockMessages(GridDhtTxFinishResponse.class, CLIENT);
- Assert.isTrue(matcher.find());
+ new TestDiscoveryNodeLeftListener(CLIENT);
+
+ IgniteInternalFuture stopNodeFut = GridTestUtils.runAsync(() -> {
+ try {
+ spi(ignite(1)).waitForBlocked();
+ }
+ catch (InterruptedException e) {
+ log.error("Waiting is interrupted.", e);
}
- passed = true;
- }
+ info("Stopping node: [" + ignite(2).name() + ']');
+
+ ignite(2).close();
+
+ },
+ "node-stopper"
+ );
+
+ try (Transaction tx = client.transactions().txStart()) {
+ client.cache(DEFAULT_CACHE_NAME).put(key, 100);
+
+ tx.commit();
+ }
+ catch (Exception e) {
+ log.error("Transaction was not committed.", e);
+
+ fail("Transaction should be committed while at last one owner present [err=" + e.getMessage() + ']');
+ }
+
+ assertEquals(100, client.cache(DEFAULT_CACHE_NAME).get(key));
+
+ stopNodeFut.get(10_000);
+ }
+
+ /**
+ * A test discovery listener to freeze handling node left events.
+ */
+ private class TestDiscoveryNodeLeftListener implements DiscoveryEventListener, HighPriorityListener {
+ /** Name node to subscribe listener. */
+ private final String nodeToSubscribe;
+
+ /**
+ * @param nodeToSubscribe Node to subscribe.
+ */
+ public TestDiscoveryNodeLeftListener(String nodeToSubscribe) {
+ this.nodeToSubscribe = nodeToSubscribe;
+
+ grid(nodeToSubscribe).context().event().addDiscoveryEventListener(this, TYPES);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+ info("Stopping node: [" + ignite(1).name() + ']');
+
+ ignite(1).close();
+
+ grid(nodeToSubscribe).context().event().removeDiscoveryEventListener(this, TYPES);
+ }
- Assert.isTrue(passed);
+ /** {@inheritDoc} */
+ @Override public int order() {
+ return 0;
}
}
}