You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/05/21 19:34:32 UTC
ignite git commit: IGNITE-8544 Use exchange result topology version
for local wal state management. - Fixes #4039.
Repository: ignite
Updated Branches:
refs/heads/master 028df980a -> fe38f3e38
IGNITE-8544 Use exchange result topology version for local wal state management. - Fixes #4039.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe38f3e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe38f3e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe38f3e3
Branch: refs/heads/master
Commit: fe38f3e389ed2cf9ccdf4146a7960a3375ee4d72
Parents: 028df98
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Mon May 21 22:33:50 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon May 21 22:33:50 2018 +0300
----------------------------------------------------------------------
.../GridDhtPartitionsExchangeFuture.java | 2 +-
...lWalModeChangeDuringRebalancingSelfTest.java | 66 ++++++++++++++++++++
2 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fe38f3e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1b79b76..c62b067 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1723,7 +1723,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false);
}
- cctx.walState().changeLocalStatesOnExchangeDone(exchId.topologyVersion());
+ cctx.walState().changeLocalStatesOnExchangeDone(res);
}
if (super.onDone(res, err)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/fe38f3e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index 07653f2..ca46a75 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -24,10 +24,12 @@ import java.nio.MappedByteBuffer;
import java.nio.file.OpenOption;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
+import com.sun.org.apache.regexp.internal.RE;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -49,6 +51,7 @@ import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
/**
*
@@ -63,6 +66,9 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
/** */
private static final AtomicReference<CountDownLatch> fileIOLatch = new AtomicReference<>();
+ /** Replicated cache name. */
+ private static final String REPL_CACHE = "cache";
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -83,7 +89,11 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
cfg.setCacheConfiguration(
new CacheConfiguration(DEFAULT_CACHE_NAME)
// Test checks internal state before and after rebalance, so it is configured to be triggered manually
+ .setRebalanceDelay(-1),
+
+ new CacheConfiguration(REPL_CACHE)
.setRebalanceDelay(-1)
+ .setCacheMode(CacheMode.REPLICATED)
);
cfg.setCommunicationSpi(new TcpCommunicationSpi() {
@@ -295,6 +305,62 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
}
/**
+ * Test that local WAL mode changing works well with exchanges merge.
+ *
+ * @throws Exception If failed.
+ */
+ public void testWithExchangesMerge() throws Exception {
+ final int nodeCnt = 5;
+ final int keyCnt = 10_000;
+
+ Ignite ignite = startGrids(nodeCnt);
+
+ ignite.cluster().active(true);
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(REPL_CACHE);
+
+ for (int k = 0; k < keyCnt; k++)
+ cache.put(k, k);
+
+ stopGrid(2);
+ stopGrid(3);
+ stopGrid(4);
+
+ // Rewrite data to trigger further rebalance.
+ for (int k = 0; k < keyCnt; k++)
+ cache.put(k, k * 2);
+
+ // Start several grids in parallel to trigger exchanges merge.
+ startGridsMultiThreaded(2, 3);
+
+ for (int nodeIdx = 2; nodeIdx < nodeCnt; nodeIdx++) {
+ CacheGroupContext grpCtx = grid(nodeIdx).cachex(REPL_CACHE).context().group();
+
+ assertFalse(grpCtx.walEnabled());
+ }
+
+ // Invoke rebalance manually.
+ for (Ignite g : G.allGrids())
+ g.cache(REPL_CACHE).rebalance();
+
+ awaitPartitionMapExchange();
+
+ for (int nodeIdx = 2; nodeIdx < nodeCnt; nodeIdx++) {
+ CacheGroupContext grpCtx = grid(nodeIdx).cachex(REPL_CACHE).context().group();
+
+ assertTrue(grpCtx.walEnabled());
+ }
+
+ // Check no data loss.
+ for (int nodeIdx = 2; nodeIdx < nodeCnt; nodeIdx++) {
+ IgniteCache<Integer, Integer> cache0 = grid(nodeIdx).cache(REPL_CACHE);
+
+ for (int k = 0; k < keyCnt; k++)
+ Assert.assertEquals("nodeIdx=" + nodeIdx + ", key=" + k, (Integer) (2 * k), cache0.get(k));
+ }
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testParallelExchangeDuringRebalance() throws Exception {