You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/05/21 19:34:32 UTC

ignite git commit: IGNITE-8544 Use exchange result topology version for local wal state management. - Fixes #4039.

Repository: ignite
Updated Branches:
  refs/heads/master 028df980a -> fe38f3e38


IGNITE-8544 Use exchange result topology version for local wal state management. - Fixes #4039.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe38f3e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe38f3e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe38f3e3

Branch: refs/heads/master
Commit: fe38f3e389ed2cf9ccdf4146a7960a3375ee4d72
Parents: 028df98
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Mon May 21 22:33:50 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon May 21 22:33:50 2018 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        |  2 +-
 ...lWalModeChangeDuringRebalancingSelfTest.java | 66 ++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fe38f3e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1b79b76..c62b067 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1723,7 +1723,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false);
             }
 
-            cctx.walState().changeLocalStatesOnExchangeDone(exchId.topologyVersion());
+            cctx.walState().changeLocalStatesOnExchangeDone(res);
         }
 
         if (super.onDone(res, err)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/fe38f3e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index 07653f2..ca46a75 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -24,10 +24,12 @@ import java.nio.MappedByteBuffer;
 import java.nio.file.OpenOption;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
+import com.sun.org.apache.regexp.internal.RE;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -49,6 +51,7 @@ import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
 
 /**
  *
@@ -63,6 +66,9 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
     /** */
     private static final AtomicReference<CountDownLatch> fileIOLatch = new AtomicReference<>();
 
+    /** Replicated cache name. */
+    private static final String REPL_CACHE = "cache";
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -83,7 +89,11 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
         cfg.setCacheConfiguration(
             new CacheConfiguration(DEFAULT_CACHE_NAME)
                 // Test checks internal state before and after rebalance, so it is configured to be triggered manually
+                .setRebalanceDelay(-1),
+
+            new CacheConfiguration(REPL_CACHE)
                 .setRebalanceDelay(-1)
+                .setCacheMode(CacheMode.REPLICATED)
         );
 
         cfg.setCommunicationSpi(new TcpCommunicationSpi() {
@@ -295,6 +305,62 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
     }
 
     /**
+     * Test that local WAL mode changing works well with exchanges merge.
+     *
+     * @throws Exception If failed.
+     */
+    public void testWithExchangesMerge() throws Exception {
+        final int nodeCnt = 5;
+        final int keyCnt = 10_000;
+
+        Ignite ignite = startGrids(nodeCnt);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ignite.cache(REPL_CACHE);
+
+        for (int k = 0; k < keyCnt; k++)
+            cache.put(k, k);
+
+        stopGrid(2);
+        stopGrid(3);
+        stopGrid(4);
+
+        // Rewrite data to trigger further rebalance.
+        for (int k = 0; k < keyCnt; k++)
+            cache.put(k, k * 2);
+
+        // Start several grids in parallel to trigger exchanges merge.
+        startGridsMultiThreaded(2, 3);
+
+        for (int nodeIdx = 2; nodeIdx < nodeCnt; nodeIdx++) {
+            CacheGroupContext grpCtx = grid(nodeIdx).cachex(REPL_CACHE).context().group();
+
+            assertFalse(grpCtx.walEnabled());
+        }
+
+        // Invoke rebalance manually.
+        for (Ignite g : G.allGrids())
+            g.cache(REPL_CACHE).rebalance();
+
+        awaitPartitionMapExchange();
+
+        for (int nodeIdx = 2; nodeIdx < nodeCnt; nodeIdx++) {
+            CacheGroupContext grpCtx = grid(nodeIdx).cachex(REPL_CACHE).context().group();
+
+            assertTrue(grpCtx.walEnabled());
+        }
+
+        // Check no data loss.
+        for (int nodeIdx = 2; nodeIdx < nodeCnt; nodeIdx++) {
+            IgniteCache<Integer, Integer> cache0 = grid(nodeIdx).cache(REPL_CACHE);
+
+            for (int k = 0; k < keyCnt; k++)
+                Assert.assertEquals("nodeIdx=" + nodeIdx + ", key=" + k, (Integer) (2 * k), cache0.get(k));
+        }
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testParallelExchangeDuringRebalance() throws Exception {