You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/01/29 19:03:25 UTC

[ignite] branch master updated: IGNITE-14703 Fixed transactions failover. Fixes #8712

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9af1eb4  IGNITE-14703 Fixed transactions failover. Fixes #8712
9af1eb4 is described below

commit 9af1eb4bf9b3425232a6b9a5109af35077e8548d
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Jan 29 22:02:44 2021 +0300

    IGNITE-14703 Fixed transactions failover. Fixes #8712
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../distributed/near/GridNearTxFinishFuture.java   |   2 +-
 .../near/IgniteTxExceptionNodeFailTest.java        | 239 ++++++++++++++-------
 2 files changed, 163 insertions(+), 78 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 8a25f86..7a4cf40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -990,7 +990,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
         /** {@inheritDoc} */
         @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
             if (tx.state() == COMMITTING || tx.state() == COMMITTED) {
-                if (concat(of(m.primary().id()), tx.transactionNodes().getOrDefault(nodeId, emptySet()).stream())
+                if (concat(of(m.primary().id()), tx.transactionNodes().getOrDefault(m.primary().id(), emptySet()).stream())
                     .noneMatch(uuid -> cctx.discovery().alive(uuid))) {
                     onDone(new CacheInvalidStateException(ALL_PARTITION_OWNERS_LEFT_GRID_MSG +
                         m.entries().stream().map(e -> " [cacheName=" + e.cached().context().name() +
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java
index f16c6ea..64a043a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java
@@ -20,66 +20,78 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import java.util.Arrays;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import org.apache.ignite.ShutdownPolicy;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionHeuristicException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.springframework.util.Assert;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
 import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.ALL_PARTITION_OWNERS_LEFT_GRID_MSG;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
 
 /**
- * Tests check a result of commit when a node fail before
- * send {@link GridNearTxFinishResponse} to transaction coodinator
+ * Tests check a result of commit when a node fail before send {@link GridNearTxFinishResponse} to transaction
+ * coordinator
  */
 @RunWith(Parameterized.class)
 public class IgniteTxExceptionNodeFailTest extends GridCommonAbstractTest {
+    /** Client node name. */
+    private static final String CLIENT = "client";
+
+    /** Node leave events for discovery event listener. */
+    private static final int[] TYPES = {EVT_NODE_LEFT, EVT_NODE_FAILED};
+
     /** Parameters. */
     @Parameterized.Parameters(name = "syncMode={0}")
-    public static Iterable<Object[]> data() {
-        return Arrays.asList(new Object[][] {
-            { PRIMARY_SYNC },
-            { FULL_SYNC },
-        });
+    public static Iterable<CacheWriteSynchronizationMode> data() {
+        return Arrays.asList(PRIMARY_SYNC, FULL_SYNC);
     }
 
     /** syncMode */
     @Parameterized.Parameter()
     public CacheWriteSynchronizationMode syncMode;
 
+    /** Amount backups for cache. */
+    public int backups = 0;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
-        DataStorageConfiguration dsConfig = new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(100L * 1024 * 1024)
-                .setPersistenceEnabled(true));
-
-        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
-
-        return cfg
-            .setDataStorageConfiguration(dsConfig)
-            .setCacheConfiguration(new CacheConfiguration("cache")
+        return super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setMaxSize(100L * 1024 * 1024)
+                    .setPersistenceEnabled(true)))
+            .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
                 .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
-                .setWriteSynchronizationMode(syncMode).setBackups(0));
+                .setWriteSynchronizationMode(syncMode)
+                .setBackups(backups));
     }
 
     /** {@inheritDoc} */
@@ -113,85 +125,158 @@ public class IgniteTxExceptionNodeFailTest extends GridCommonAbstractTest {
      */
     @Test
     public void testNodeFailBeforeSendGridNearTxFinishResponse() throws Exception {
-        startGrids(2);
+        IgniteEx grid0 = startGrids(2);
 
-        grid(0).cluster().active(true);
+        grid0.cluster().state(ClusterState.ACTIVE);
 
-        IgniteEx grid0 = grid(0);
         IgniteEx grid1 = grid(1);
 
-        int key0 = 0;
-        int key1 = 0;
+        int key0 = primaryKey(grid0.cache(DEFAULT_CACHE_NAME));
+        int key1 = primaryKey(grid1.cache(DEFAULT_CACHE_NAME));
 
-        Affinity<Object> aff = grid1.affinity("cache");
+        Affinity<Object> aff = grid1.affinity(DEFAULT_CACHE_NAME);
 
-        for (int i = 1; i < 1000; i++) {
-            if (grid0.equals(grid(aff.mapKeyToNode(i)))) {
-                key0 = i;
+        assertFalse(
+            "Keys have the same mapping [key0=" + key0 + ", key1=" + key1 + ']',
+            aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(key1))
+        );
+
+        spi(grid0).blockMessages(GridNearTxFinishResponse.class, getTestIgniteInstanceName(1));
+
+        IgniteInternalFuture stopNodeFut = GridTestUtils.runAsync(() -> {
+                try {
+                    spi(grid0).waitForBlocked();
+                }
+                catch (InterruptedException e) {
+                    log.error("Waiting is interrupted.", e);
+                }
+
+                info("Stopping node: [" + grid0.name() + ']');
+
+                grid0.close();
+
+            },
+            "node-stopper"
+        );
+
+        try (Transaction tx = grid1.transactions().txStart()) {
+            grid1.cache(DEFAULT_CACHE_NAME).put(key0, 100);
+            grid1.cache(DEFAULT_CACHE_NAME).put(key1, 200);
+
+            tx.commit();
+
+            fail("Transaction passed, but no one partition is alive.");
 
-                break;
-            }
         }
+        catch (Exception e) {
+            assertTrue(X.hasCause(e, CacheInvalidStateException.class));
+
+            String msg = e.getMessage();
+
+            assertTrue(msg.contains(ALL_PARTITION_OWNERS_LEFT_GRID_MSG));
+
+            if (!mvccEnabled(grid1.context())) {
+                Pattern msgPtrn;
 
-        for (int i = key0; i < 1000; i++) {
-            if (grid1.equals(grid(aff.mapKeyToNode(i))) && !aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(i))) {
-                key1 = i;
+                msgPtrn = Pattern.compile(" \\[cacheName=" + DEFAULT_CACHE_NAME +
+                    ", partition=\\d+, " +
+                    "key=KeyCacheObjectImpl \\[part=\\d+, val=" + key0 +
+                    ", hasValBytes=true\\]\\]");
 
-                break;
+                Matcher matcher = msgPtrn.matcher(msg);
+
+                assertTrue("Message does not match: [msg=" + msg + ']', matcher.find());
             }
         }
 
-        assert !aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(key1));
+        stopNodeFut.get(10_000);
+    }
 
-        try (Transaction tx = grid1.transactions().txStart()) {
-            grid1.cache("cache").put(key0, 100);
-            grid1.cache("cache").put(key1, 200);
-
-            spi(grid0).blockMessages((node, msg) -> {
-                    if (msg instanceof GridNearTxFinishResponse) {
-                        new Thread(
-                            new Runnable() {
-                                @Override public void run() {
-                                    log().info("Stopping node: [" + grid0.name() + "]");
-
-                                    IgnitionEx.stop(grid0.name(), true, ShutdownPolicy.IMMEDIATE, false);
-                                }
-                            },
-                            "node-stopper"
-                        ).start();
-
-                        return true;
-                    }
-
-                    return false;
-                }
-            );
+    /**
+     * Test checks the all node leave detector when cache has backups enough.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void cacheWithBackups() throws Exception {
+        backups = 2;
 
-            boolean passed = false;
+        IgniteEx ignite0 = startGrids(3);
 
-            try {
-                tx.commit();
-            }
-            catch (Throwable e) {
-                String msg = e.getMessage();
+        ignite0.cluster().state(ClusterState.ACTIVE);
+
+        IgniteEx client = startClientGrid(CLIENT);
 
-                Assert.isTrue(e.getCause() instanceof CacheInvalidStateException);
+        awaitPartitionMapExchange();
 
-                Assert.isTrue(msg.contains(ALL_PARTITION_OWNERS_LEFT_GRID_MSG));
+        int key = primaryKey(ignite(1).cache(DEFAULT_CACHE_NAME));
 
-                if (!mvccEnabled(grid1.context())) {
-                    Pattern msgPtrn = Pattern.compile(" \\[cacheName=cache, partition=\\d+, " + "key=KeyCacheObjectImpl \\[part=\\d+, val=" + key0 +
-                        ", hasValBytes=true\\]\\]");
+        spi(ignite(1)).blockMessages(GridNearTxFinishResponse.class, CLIENT);
 
-                    Matcher matcher = msgPtrn.matcher(msg);
+        spi(ignite(2)).blockMessages(GridDhtTxFinishResponse.class, CLIENT);
 
-                    Assert.isTrue(matcher.find());
+        new TestDiscoveryNodeLeftListener(CLIENT);
+
+        IgniteInternalFuture stopNodeFut = GridTestUtils.runAsync(() -> {
+                try {
+                    spi(ignite(1)).waitForBlocked();
+                }
+                catch (InterruptedException e) {
+                    log.error("Waiting is interrupted.", e);
                 }
 
-                passed = true;
-            }
+                info("Stopping node: [" + ignite(2).name() + ']');
+
+                ignite(2).close();
+
+            },
+            "node-stopper"
+        );
+
+        try (Transaction tx = client.transactions().txStart()) {
+            client.cache(DEFAULT_CACHE_NAME).put(key, 100);
+
+            tx.commit();
+        }
+        catch (Exception e) {
+            log.error("Transaction was not committed.", e);
+
+            fail("Transaction should be committed while at last one owner present [err=" + e.getMessage() + ']');
+        }
+
+        assertEquals(100, client.cache(DEFAULT_CACHE_NAME).get(key));
+
+        stopNodeFut.get(10_000);
+    }
+
+    /**
+     * A test discovery listener to freeze handling node left events.
+     */
+    private class TestDiscoveryNodeLeftListener implements DiscoveryEventListener, HighPriorityListener {
+        /** Name node to subscribe listener. */
+        private final String nodeToSubscribe;
+
+        /**
+         * @param nodeToSubscribe Node to subscribe.
+         */
+        public TestDiscoveryNodeLeftListener(String nodeToSubscribe) {
+            this.nodeToSubscribe = nodeToSubscribe;
+
+            grid(nodeToSubscribe).context().event().addDiscoveryEventListener(this, TYPES);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+            info("Stopping node: [" + ignite(1).name() + ']');
+
+            ignite(1).close();
+
+            grid(nodeToSubscribe).context().event().removeDiscoveryEventListener(this, TYPES);
+        }
 
-            Assert.isTrue(passed);
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
         }
     }
 }