You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 10:22:25 UTC
[02/10] ignite git commit: ignite-6124 Merge exchanges for multiple
discovery events
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 14c3d4e..b8a91a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -126,6 +126,15 @@ public class IgniteThread extends Thread {
}
/**
+ * @return {@code True} if thread belongs to pool processing cache operations.
+ */
+ public boolean cachePoolThread() {
+ return stripe >= 0 ||
+ plc == GridIoPolicy.SYSTEM_POOL ||
+ plc == GridIoPolicy.UTILITY_CACHE_POOL;
+ }
+
+ /**
* Gets name of the Ignite instance this thread belongs to.
*
* @return Name of the Ignite instance this thread belongs to.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java
new file mode 100644
index 0000000..e49d5da
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jsr166.ThreadLocalRandom8;
+
+/**
+ *
+ */
+public abstract class TestDelayingCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+ throws IgniteSpiException {
+ try {
+ GridIoMessage ioMsg = (GridIoMessage)msg;
+
+ if (delayMessage(ioMsg.message(), ioMsg))
+ U.sleep(ThreadLocalRandom8.current().nextInt(delayMillis()) + 1);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteSpiException(e);
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /**
+ * @param msg Message.
+ * @param ioMsg Wrapper message.
+ * @return {@code True} if need delay message.
+ */
+ protected abstract boolean delayMessage(Message msg, GridIoMessage ioMsg);
+
+ /**
+ * @return Max delay time.
+ */
+ protected int delayMillis() {
+ return 250;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
index 9f4fc80..bff63fb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -27,12 +27,12 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
@@ -328,6 +328,13 @@ public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractT
Map<Integer, Integer> dupPartsData,
GridDhtPartitionsSingleMessage msg)
{
+ if (!F.isEmpty(msg.cacheGroupsAffinityRequest())) {
+ for (GridDhtPartitionMap map : msg.partitions().values())
+ assertTrue(F.isEmpty(map.map()));
+
+ return;
+ }
+
int cache1Grp = groupIdForCache(ignite(0), cache1);
int cache2Grp = groupIdForCache(ignite(0), cache2);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
index f32e15f..eda0a49 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
@@ -287,7 +287,9 @@ public class IgniteClientCacheStartFailoverTest extends GridCommonAbstractTest {
for (String cacheName : cacheNames) {
GridDhtPartitionTopology top = node.context().cache().internalCache(cacheName).context().topology();
- assertEquals(topVer, top.topologyVersion());
+ waitForReadyTopology(top, topVer);
+
+ assertEquals(topVer, top.readyTopologyVersion());
assertFalse(top.rebalanceFinished(topVer));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
index cb7c274..4c9ad27 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
@@ -1087,7 +1087,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
client = false;
- // Start one more nodes while transition is in progress.
+ // Start more nodes while transition is in progress.
IgniteInternalFuture startFut1 = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
startGrid(8);
@@ -1106,7 +1106,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
U.sleep(500);
// Stop coordinator.
- stopGrid(0);
+ stopGrid(getTestIgniteInstanceName(0), true, false);
stopGrid(getTestIgniteInstanceName(1), true, false);
stopGrid(getTestIgniteInstanceName(4), true, false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 4f606c9..8f601f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -324,9 +324,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount(); g++) {
IgniteEx kernal0 = grid(g);
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
info("Getting cache for node: " + g);
assertNotNull(grid(g).cache(DYNAMIC_CACHE_NAME));
@@ -345,14 +342,13 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
kernal.destroyCache(DYNAMIC_CACHE_NAME);
+ awaitPartitionMapExchange();
+
for (int g = 0; g < nodeCount(); g++) {
final IgniteKernal kernal0 = (IgniteKernal)grid(g);
final int idx = g;
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
assertNull(kernal0.cache(DYNAMIC_CACHE_NAME));
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -387,9 +383,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount(); g++) {
IgniteEx kernal0 = grid(g);
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
info("Getting cache for node: " + g);
for (int i = 0; i < cacheCnt; i++)
@@ -423,6 +416,8 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
kernal.destroyCaches(namesToDestroy);
+ awaitPartitionMapExchange();
+
for (int g = 0; g < nodeCount(); g++) {
final IgniteKernal kernal0 = (IgniteKernal)grid(g);
@@ -430,9 +425,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
final int idx = g * nodeCount() + i;
final int expVal = i;
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
assertNull(kernal0.cache(DYNAMIC_CACHE_NAME));
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -481,13 +473,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
startGrid(nodeCount() + 1);
+ awaitPartitionMapExchange();
+
// Check that cache is not deployed on new node after undeploy.
for (int g = 0; g < nodeCount() + 2; g++) {
final IgniteKernal kernal0 = (IgniteKernal)grid(g);
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
assertNull(kernal0.cache(DYNAMIC_CACHE_NAME));
}
}
@@ -537,13 +528,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
}
}
+ awaitPartitionMapExchange();
+
// Check that cache is not deployed on new node after undeploy.
for (int g = 0; g < nodeCount() + 2; g++) {
final IgniteKernal kernal0 = (IgniteKernal)grid(g);
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
if (g < nodeCount())
assertNotNull(grid(g).cache(DYNAMIC_CACHE_NAME));
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
index 2e551f9..b78c972 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
@@ -33,7 +33,7 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lifecycle.LifecycleAware;
@@ -271,9 +271,9 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
IgniteKernal kernal = (IgniteKernal)ignite;
- GridDhtCacheAdapter<Object, Object> dht = kernal.context().cache().internalCache(cacheName).context().dht();
+ GridDhtPartitionsExchangeFuture curFut = kernal.context().cache().context().exchange().lastTopologyFuture();
- long cacheTopVer = dht.topology().topologyVersionFuture().topologyVersion().topologyVersion();
+ long cacheTopVer = curFut.context().events().topologyVersion().topologyVersion();
if (hasSplit(nodes)) {
boolean resolved = activatorTopVer != 0 && cacheTopVer >= activatorTopVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/NonAffinityCoordinatorDynamicStartStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/NonAffinityCoordinatorDynamicStartStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/NonAffinityCoordinatorDynamicStartStopTest.java
index a4733d5..d065941 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/NonAffinityCoordinatorDynamicStartStopTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/NonAffinityCoordinatorDynamicStartStopTest.java
@@ -117,7 +117,7 @@ public class NonAffinityCoordinatorDynamicStartStopTest extends GridCommonAbstra
cache.destroy();
- grid("dummy").createCache(CCFG);
+ grid(DUMMY_GRID_NAME).createCache(CCFG);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 5656138..782482e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -767,7 +767,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
return null;
}
- });
+ }, "lock-thread");
// Wait until l.lock() has been called.
while(!l.hasQueuedThreads() && !done.get()){
@@ -778,6 +778,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
}
});
+ long endTime = System.currentTimeMillis() + getTestTimeout();
+
while (!fut.isDone()) {
try {
lock.lock();
@@ -797,6 +799,9 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
assertFalse(lock.isHeldByCurrentThread());
}
}
+
+ if (System.currentTimeMillis() > endTime)
+ fail("Failed to wait for topology change threads.");
}
fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
new file mode 100644
index 0000000..f93d60c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -0,0 +1,1528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.TestDelayingCommunicationSpi;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+
+/**
+ *
+ */
+public class CacheExchangeMergeTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final long WAIT_SECONDS = 15;
+
+ /** */
+ private ThreadLocal<Boolean> client = new ThreadLocal<>();
+
+ /** */
+ private boolean testSpi;
+
+ /** */
+ private boolean testDelaySpi;
+
+ /** */
+ private static String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"};
+
+ /** */
+ private boolean cfgCache = true;
+
+ /** */
+ private IgniteClosure<String, Boolean> clientC;
+
+ /** */
+ private static ExecutorService executor;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ if (testSpi)
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+ else if (testDelaySpi)
+ cfg.setCommunicationSpi(new TestDelayExchangeMessagesSpi());
+
+ Boolean clientMode = client.get();
+
+ if (clientMode == null && clientC != null)
+ clientMode = clientC.apply(igniteInstanceName);
+
+ if (clientMode != null) {
+ cfg.setClientMode(clientMode);
+
+ client.set(null);
+ }
+
+ if (cfgCache) {
+ cfg.setCacheConfiguration(
+ cacheConfiguration("c1", ATOMIC, PARTITIONED, 0),
+ cacheConfiguration("c2", ATOMIC, PARTITIONED, 1),
+ cacheConfiguration("c3", ATOMIC, PARTITIONED, 2),
+ cacheConfiguration("c4", ATOMIC, PARTITIONED, 10),
+ cacheConfiguration("c5", ATOMIC, REPLICATED, 0),
+ cacheConfiguration("c6", TRANSACTIONAL, PARTITIONED, 0),
+ cacheConfiguration("c7", TRANSACTIONAL, PARTITIONED, 1),
+ cacheConfiguration("c8", TRANSACTIONAL, PARTITIONED, 2),
+ cacheConfiguration("c9", TRANSACTIONAL, PARTITIONED, 10),
+ cacheConfiguration("c10", TRANSACTIONAL, REPLICATED, 0)
+ );
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ if (executor != null)
+ executor.shutdown();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @param name Cache name.
+ * @param atomicityMode Cache atomicity mode.
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name,
+ CacheAtomicityMode atomicityMode,
+ CacheMode cacheMode,
+ int backups)
+ {
+ CacheConfiguration ccfg = new CacheConfiguration(name);
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setCacheMode(cacheMode);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDelayExchangeMessages() throws Exception {
+ testDelaySpi = true;
+
+ System.setProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, "2000");
+
+ try {
+ final int srvs = 6;
+ final int clients = 3;
+
+ startGridsMultiThreaded(srvs);
+
+ for (int i = 0; i < clients; i++) {
+ client.set(true);
+
+ startGrid(srvs + i);
+ }
+
+ final int initNodes = srvs + clients;
+
+ final AtomicInteger stopIdx = new AtomicInteger();
+
+ IgniteInternalFuture stopFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Thread.sleep(ThreadLocalRandom.current().nextLong(500) + 1);
+
+ stopGrid(stopIdx.incrementAndGet());
+
+ return null;
+ }
+ }, 3, "stop-srv");
+
+ final AtomicInteger startIdx = new AtomicInteger(initNodes);
+
+ IgniteInternalFuture startFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int nodeIdx = startIdx.incrementAndGet();
+
+ if (rnd.nextInt(3) == 0) {
+ log.info("Start client: " + nodeIdx);
+
+ client.set(true);
+ }
+ else
+ log.info("Start server: " + nodeIdx);
+
+ startGrid(nodeIdx);
+
+ if (rnd.nextBoolean()) {
+ log.info("Stop started node: " + nodeIdx);
+
+ stopGrid(nodeIdx);
+ }
+
+ return null;
+ }
+ }, 5, "start-node");
+
+ stopFut.get();
+ startFut.get();
+
+ checkCaches();
+ }
+ finally {
+ System.clearProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeStartRandomClientsServers() throws Exception {
+ for (int iter = 0; iter < 3; iter++) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ final int srvs = rnd.nextInt(3) + 1;
+ final int clients = rnd.nextInt(3);
+
+ log.info("Iteration [iter=" + iter + ", srvs=" + srvs + ", clients=" + clients + ']');
+
+ Ignite srv0 = startGrids(srvs);
+
+ for (int i = 0; i < clients; i++) {
+ client.set(true);
+
+ startGrid(srvs + i);
+ }
+
+ final int threads = 8;
+
+ final int initNodes = srvs + clients;
+
+ mergeExchangeWaitVersion(srv0, initNodes + threads);
+
+ final AtomicInteger idx = new AtomicInteger(initNodes);
+
+ IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int nodeIdx = idx.incrementAndGet();
+
+ if (rnd.nextInt(3) == 0) {
+ log.info("Start client: " + nodeIdx);
+
+ client.set(true);
+ }
+ else
+ log.info("Start server: " + nodeIdx);
+
+ startGrid(nodeIdx);
+
+ return null;
+ }
+ }, threads, "test-thread");
+
+ fut.get();
+
+ checkCaches();
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeStartStopRandomClientsServers() throws Exception {
+ for (int iter = 0; iter < 3; iter++) {
+ log.info("Iteration: " + iter);
+
+ final int srvs = 5;
+ final int clients = 5;
+
+ Ignite srv0 = startGrids(srvs);
+
+ for (int i = 0; i < clients; i++) {
+ client.set(true);
+
+ startGrid(srvs + i);
+ }
+
+ final int threads = 8;
+
+ final int initNodes = srvs + clients;
+
+ mergeExchangeWaitVersion(srv0, initNodes + threads);
+
+ final AtomicInteger idx = new AtomicInteger(initNodes);
+
+ final ConcurrentHashSet<Integer> stopNodes = new ConcurrentHashSet<>();
+
+ IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ if (rnd.nextBoolean()) {
+ Integer stopIdx;
+
+ for (;;) {
+ stopIdx = rnd.nextInt(initNodes - 1) + 1;
+
+ if (stopNodes.add(stopIdx))
+ break;
+ }
+
+ log.info("Stop node: " + stopIdx);
+
+ stopGrid(getTestIgniteInstanceName(stopIdx), true, false);
+ }
+ else {
+ int nodeIdx = idx.incrementAndGet();
+
+ if (rnd.nextInt(5) == 0) {
+ log.info("Start client: " + nodeIdx);
+
+ client.set(true);
+ }
+ else
+ log.info("Start server: " + nodeIdx);
+
+ startGrid(nodeIdx);
+ }
+
+ return null;
+ }
+ }, threads, "test-thread");
+
+ fut.get();
+
+ checkCaches();
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentStartServers() throws Exception {
+ concurrentStart(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentStartServersAndClients() throws Exception {
+ concurrentStart(true);
+ }
+
+ /**
+ * @param withClients If {@code true} also starts client nodes.
+ * @throws Exception If failed.
+ */
+ private void concurrentStart(final boolean withClients) throws Exception {
+ for (int i = 0; i < 5; i++) {
+ log.info("Iteration: " + i);
+
+ startGrid(0);
+
+ final AtomicInteger idx = new AtomicInteger(1);
+
+ IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ if (withClients)
+ client.set(ThreadLocalRandom.current().nextBoolean());
+
+ int nodeIdx = idx.getAndIncrement();
+
+ Ignite node = startGrid(nodeIdx);
+
+ checkNodeCaches(node, nodeIdx * 1000, 1000);
+
+ return null;
+ }
+ }, 10, "start-node");
+
+ fut.get();
+
+ checkCaches();
+
+ startGrid(1000);
+
+ checkCaches();
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeServerAndClientJoin1() throws Exception {
+ final IgniteEx srv0 = startGrid(0);
+
+ mergeExchangeWaitVersion(srv0, 3);
+
+ IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ startGrid(1);
+
+ return null;
+ }
+ }, 1, "start-srv");
+
+ waitForExchangeStart(srv0, 2);
+
+ IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ client.set(true);
+
+ startGrid(2);
+
+ return null;
+ }
+ }, 1, "start-client");
+
+ fut1.get();
+ fut2.get();
+
+ checkCaches();
+
+ checkExchanges(srv0, 1, 3);
+ checkExchanges(ignite(1), 3);
+ checkExchanges(ignite(2), 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheOnJoinAndJoinMerge_2_nodes() throws Exception {
+ startCacheOnJoinAndJoinMerge1(2, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheOnJoinAndJoinMerge_4_nodes() throws Exception {
+ startCacheOnJoinAndJoinMerge1(4, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheOnJoinAndJoinMerge_WithClients() throws Exception {
+ startCacheOnJoinAndJoinMerge1(5, true);
+ }
+
+ /**
+ * @param nodes Number of nodes to start.
+ * @param withClients If {@code true} starts both servers and clients.
+ * @throws Exception If failed.
+ */
+ private void startCacheOnJoinAndJoinMerge1(int nodes, boolean withClients) throws Exception {
+ cfgCache = false;
+
+ final IgniteEx srv0 = startGrid(0);
+
+ mergeExchangeWaitVersion(srv0, nodes + 1);
+
+ if (withClients) {
+ clientC = new IgniteClosure<String, Boolean>() {
+ @Override public Boolean apply(String nodeName) {
+ return getTestIgniteInstanceIndex(nodeName) % 2 == 0;
+ }
+ };
+ }
+
+ cfgCache = true;
+
+ IgniteInternalFuture fut = startGridsAsync(srv0, 1, nodes);
+
+ fut.get();
+
+ checkCaches();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeAndHistoryCleanup() throws Exception {
+ final int histSize = 5;
+
+ String oldHistVal = System.getProperty(IGNITE_EXCHANGE_HISTORY_SIZE);
+
+ System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, String.valueOf(histSize));
+
+ try {
+ final Ignite srv0 = startGrid(0);
+
+ int topVer = 1;
+
+ for (int i = 0; i < 3; i++) {
+ mergeExchangeWaitVersion(srv0, topVer + 3);
+
+ startGridsAsync(srv0, topVer, 3).get();
+
+ topVer += 3;
+ }
+
+ checkHistorySize(histSize);
+
+ awaitPartitionMapExchange();
+
+ checkHistorySize(histSize);
+
+ mergeExchangeWaitVersion(srv0, topVer + 2);
+
+ stopGrid(1);
+ stopGrid(2);
+
+ checkHistorySize(histSize);
+
+ awaitPartitionMapExchange();
+
+ checkHistorySize(histSize);
+ }
+ finally {
+ if (oldHistVal != null)
+ System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, oldHistVal);
+ else
+ System.clearProperty(IGNITE_EXCHANGE_HISTORY_SIZE);
+ }
+ }
+
+ /**
+ * @param histSize History size.
+ */
+ private void checkHistorySize(int histSize) {
+ List<Ignite> nodes = G.allGrids();
+
+ assertTrue(nodes.size() > 0);
+
+ for (Ignite node : nodes) {
+ List<GridDhtPartitionsExchangeFuture> exchFuts =
+ ((IgniteEx)node).context().cache().context().exchange().exchangeFutures();
+
+ assertTrue("Unexpected size: " + exchFuts.size(), exchFuts.size() > 0 && exchFuts.size() <= histSize);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheOnJoinAndMergeWithFail() throws Exception {
+ cfgCache = false;
+
+ final Ignite srv0 = startGrids(2);
+
+ mergeExchangeWaitVersion(srv0, 5);
+
+ cfgCache = true;
+
+ IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);
+
+ stopGrid(1);
+
+ fut.get();
+
+ checkCaches();
+
+ checkExchanges(srv0, 1, 2, 3, 5);
+ checkExchanges(ignite(2), 3, 5);
+ checkExchanges(ignite(3), 5);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception {
+ cfgCache = false;
+
+ final Ignite srv0 = startGrids(2);
+
+ mergeExchangeWaitVersion(srv0, 5);
+
+ cfgCache = true;
+
+ IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);
+
+ stopGrid(0);
+
+ fut.get();
+
+ checkCaches();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheOnJoinAndCoordinatorFailed2() throws Exception {
+ cfgCache = false;
+
+ final Ignite srv0 = startGrid(0);
+
+ mergeExchangeWaitVersion(srv0, 3);
+
+ cfgCache = true;
+
+ IgniteInternalFuture fut = startGridsAsync(srv0, 1, 2);
+
+ stopGrid(0);
+
+ fut.get();
+
+ checkCaches();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeServersJoin1() throws Exception {
+ IgniteEx srv0 = startGrid(0);
+
+ mergeExchangeWaitVersion(srv0, 3);
+
+ final AtomicInteger idx = new AtomicInteger(1);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ startGrid(idx.getAndIncrement());
+
+ return null;
+ }
+ }, 2, "start-node");
+
+ fut.get();
+
+ checkCaches();
+
+ checkExchanges(srv0, 1, 3);
+ checkExchanges(ignite(1), 3);
+ checkExchanges(ignite(2), 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeServerJoin1ClientsInTopology() throws Exception {
+ IgniteEx srv0 = startGrid(0);
+
+ client.set(true);
+
+ startGrid(1);
+
+ client.set(true);
+
+ startGrid(2);
+
+ mergeExchangeWaitVersion(srv0, 5);
+
+ final AtomicInteger idx = new AtomicInteger(3);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ startGrid(idx.getAndIncrement());
+
+ return null;
+ }
+ }, 2, "start-node");
+
+ fut.get();
+
+ checkCaches();
+
+ checkExchanges(srv0, 1, 2, 3, 5);
+ checkExchanges(ignite(1), 2, 3, 5);
+ checkExchanges(ignite(2), 3, 5);
+ checkExchanges(ignite(3), 5);
+ checkExchanges(ignite(4), 5);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeAndNewCoordinator() throws Exception {
+ final Ignite srv0 = startGrids(3);
+
+ mergeExchangeWaitVersion(srv0, 6);
+
+ IgniteInternalFuture fut = startGridsAsync(srv0, 3, 3);
+
+ fut.get();
+
+ checkCaches();
+
+ stopGrid(0);
+
+ checkCaches();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeServersFail1_1() throws Exception {
+ mergeServersFail1(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeServersFail1_2() throws Exception {
+ mergeServersFail1(true);
+ }
+
+ /**
+ * @param waitRebalance Wait for rebalance end before start tested topology change.
+ * @throws Exception If failed.
+ */
+ private void mergeServersFail1(boolean waitRebalance) throws Exception {
+ final Ignite srv0 = startGrids(4);
+
+ if (waitRebalance)
+ awaitPartitionMapExchange();
+
+ mergeExchangeWaitVersion(srv0, 6);
+
+ stopGrid(getTestIgniteInstanceName(3), true, false);
+ stopGrid(getTestIgniteInstanceName(2), true, false);
+
+ checkCaches();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeServersAndClientsFail1() throws Exception {
+ mergeServersAndClientsFail(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeServersAndClientsFail2() throws Exception {
+ mergeServersAndClientsFail(true);
+ }
+
+
+ /**
+ * @param waitRebalance Wait for rebalance end before start tested topology change.
+ * @throws Exception If failed.
+ */
+ private void mergeServersAndClientsFail(boolean waitRebalance) throws Exception {
+ clientC = new IgniteClosure<String, Boolean>() {
+ @Override public Boolean apply(String nodeName) {
+ return nodeName.equals(getTestIgniteInstanceName(2)) || nodeName.equals(getTestIgniteInstanceName(3));
+ }
+ };
+
+ final Ignite srv0 = startGrids(6);
+
+ if (waitRebalance)
+ awaitPartitionMapExchange();
+
+ mergeExchangeWaitVersion(srv0, 10);
+
+ stopGrid(getTestIgniteInstanceName(1), true, false);
+ stopGrid(getTestIgniteInstanceName(2), true, false);
+ stopGrid(getTestIgniteInstanceName(3), true, false);
+ stopGrid(getTestIgniteInstanceName(4), true, false);
+
+ checkAffinity();
+
+ mergeExchangeWaitVersion(srv0, 12);
+
+ IgniteInternalFuture fut = startGridsAsync(srv0, 6, 2);
+
+ fut.get();
+
+ checkCaches();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinExchangeCoordinatorChange_NoMerge_1() throws Exception {
+ for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+ exchangeCoordinatorChangeNoMerge(4, true, mode);
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinExchangeCoordinatorChange_NoMerge_2() throws Exception {
+ for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+ exchangeCoordinatorChangeNoMerge(8, true, mode);
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailExchangeCoordinatorChange_NoMerge_1() throws Exception {
+ for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+ exchangeCoordinatorChangeNoMerge(5, false, mode);
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailExchangeCoordinatorChange_NoMerge_2() throws Exception {
+ for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+ exchangeCoordinatorChangeNoMerge(8, false, mode);
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeJoinExchangesCoordinatorChange1_4_servers() throws Exception {
+ for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+ mergeJoinExchangesCoordinatorChange1(4, mode);
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeJoinExchangesCoordinatorChange1_8_servers() throws Exception {
+ for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+ mergeJoinExchangesCoordinatorChange1(8, mode);
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param srvs Number of server nodes.
+ * @param mode Test mode.
+ * @throws Exception If failed.
+ */
+ private void mergeJoinExchangesCoordinatorChange1(final int srvs, CoordinatorChangeMode mode)
+ throws Exception
+ {
+ log.info("Test mergeJoinExchangesCoordinatorChange1 [srvs=" + srvs + ", mode=" + mode + ']');
+
+ testSpi = true;
+
+ Ignite srv0 = startGrids(srvs);
+
+ mergeExchangeWaitVersion(srv0, 6);
+
+ CountDownLatch latch = blockExchangeFinish(srvs, mode);
+
+ IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, 2);
+
+ if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
+ fail("Failed to wait for expected messages.");
+
+ stopGrid(getTestIgniteInstanceName(0), true, false);
+
+ fut.get();
+
+ checkCaches();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeJoinExchangesCoordinatorChange2_4_servers() throws Exception {
+ mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 4), F.asList(5));
+
+ stopAllGrids();
+
+ mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 5), F.asList(4));
+ }
+
+ /**
+ * @param srvs Number of server nodes.
+ * @param startNodes Number of nodes to start.
+ * @param blockNodes Nodes which do not receive messages.
+ * @param waitMsgNodes Nodes which should receive messages.
+ * @throws Exception If failed.
+ */
+ private void mergeJoinExchangeCoordinatorChange2(final int srvs,
+ final int startNodes,
+ List<Integer> blockNodes,
+ List<Integer> waitMsgNodes) throws Exception
+ {
+ testSpi = true;
+
+ Ignite srv0 = startGrids(srvs);
+
+ mergeExchangeWaitVersion(srv0, srvs + startNodes);
+
+ CountDownLatch latch = blockExchangeFinish(srv0, srvs + 1, blockNodes, waitMsgNodes);
+
+ IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, startNodes);
+
+ if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
+ fail("Failed to wait for expected messages.");
+
+ stopGrid(getTestIgniteInstanceName(0), true, false);
+
+ fut.get();
+
+ checkCaches();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMergeExchangeCoordinatorChange4() throws Exception {
+ testSpi = true;
+
+ final int srvs = 4;
+
+ Ignite srv0 = startGrids(srvs);
+
+ mergeExchangeWaitVersion(srv0, 6);
+
+ final AtomicInteger idx = new AtomicInteger(srvs);
+
+ CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(1, 2, 3, 4), F.asList(5));
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ startGrid(idx.getAndIncrement());
+
+ return null;
+ }
+ }, 2, "start-node");
+
+ if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
+ fail("Failed to wait for expected messages.");
+
+ stopGrid(getTestIgniteInstanceName(0), true, false);
+
+ fut.get();
+
+ checkCaches();
+ }
+
+ /**
+ * @param srvs Number of servers.
+ * @param join If {@code true} starts new node, otherwise stops node.
+ * @param mode Tested scenario.
+ * @throws Exception If failed.
+ */
+ private void exchangeCoordinatorChangeNoMerge(int srvs, final boolean join, CoordinatorChangeMode mode) throws Exception {
+ log.info("Test mergeJoinExchangeCoordinatorChange [nodes=" + srvs + ", mode=" + mode + ']');
+
+ testSpi = true;
+
+ final int nodes = srvs;
+
+ startGrids(nodes);
+
+ CountDownLatch latch = blockExchangeFinish(srvs, mode);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ if (join)
+ startGrid(nodes);
+ else
+ stopGrid(nodes - 1);
+
+ return null;
+ }
+ });
+
+ waitForExchangeStart(ignite(0), nodes + 1);
+
+ if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
+ fail("Failed to wait for expected messages.");
+
+ stopGrid(0);
+
+ fut.get();
+
+ checkCaches();
+ }
+
+ /**
+ * @param srvs Number of server nodes.
+ * @param mode Test scenario.
+ * @return Awaited state latch.
+ * @throws Exception If failed.
+ */
+ private CountDownLatch blockExchangeFinish(int srvs, CoordinatorChangeMode mode) throws Exception {
+ Ignite crd = ignite(0);
+
+ long topVer = srvs + 1;
+
+ switch (mode) {
+ case NOBODY_RCVD: {
+ blockExchangeFinish(crd, topVer);
+
+ break;
+ }
+
+ case NEW_CRD_RCDV: {
+ List<Integer> finishNodes = F.asList(1);
+
+ return blockExchangeFinish(crd, topVer, blockNodes(srvs, finishNodes), finishNodes);
+ }
+
+ case NON_CRD_RCVD: {
+ assert srvs > 2 : srvs;
+
+ List<Integer> finishNodes = F.asList(2);
+
+ return blockExchangeFinish(crd, topVer, blockNodes(srvs, finishNodes), finishNodes);
+ }
+
+ default:
+ fail();
+ }
+
+ return null;
+ }
+
+ /**
+ * @param srvs Number of servers.
+ * @param waitNodes Nodes which should receive message.
+ * @return Blocked nodes indexes.
+ */
+ private List<Integer> blockNodes(int srvs, List<Integer> waitNodes) {
+ List<Integer> block = new ArrayList<>();
+
+ for (int i = 0; i < srvs + 1; i++) {
+ if (!waitNodes.contains(i))
+ block.add(i);
+ }
+
+ return block;
+ }
+
+ /**
+ * @param crd Exchange coordinator.
+ * @param topVer Exchange topology version.
+ */
+ private void blockExchangeFinish(Ignite crd, long topVer) {
+ final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);
+
+ TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtPartitionsFullMessage) {
+ GridDhtPartitionsFullMessage msg0 = (GridDhtPartitionsFullMessage)msg;
+
+ return msg0.exchangeId() != null && msg0.exchangeId().topologyVersion().equals(topVer0);
+ }
+
+ return false;
+ }
+ });
+ }
+
+ /**
+ * @param crd Exchange coordinator.
+ * @param topVer Exchange topology version.
+ * @param blockNodes Nodes which do not receive messages.
+ * @param waitMsgNodes Nodes which should receive messages.
+ * @return Awaited state latch.
+ */
+ private CountDownLatch blockExchangeFinish(Ignite crd,
+ long topVer,
+ final List<Integer> blockNodes,
+ final List<Integer> waitMsgNodes)
+ {
+ log.info("blockExchangeFinish [crd=" + crd.cluster().localNode().id() +
+ ", block=" + blockNodes +
+ ", wait=" + waitMsgNodes + ']');
+
+ final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);
+
+ final CountDownLatch latch = new CountDownLatch(waitMsgNodes.size());
+
+ TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtPartitionsFullMessage) {
+ GridDhtPartitionsFullMessage msg0 = (GridDhtPartitionsFullMessage)msg;
+
+ if (msg0.exchangeId() == null || msg0.exchangeId().topologyVersion().compareTo(topVer0) < 0)
+ return false;
+
+ String name = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME);
+
+ assert name != null : node;
+
+ for (Integer idx : blockNodes) {
+ if (name.equals(getTestIgniteInstanceName(idx)))
+ return true;
+ }
+
+ for (Integer idx : waitMsgNodes) {
+ if (name.equals(getTestIgniteInstanceName(idx))) {
+ log.info("Coordinators sends awaited message [node=" + node.id() + ']');
+
+ latch.countDown();
+ }
+ }
+ }
+
+ return false;
+ }
+ });
+
+ return latch;
+ }
+
+ /**
+ * @param node Node.
+ * @param topVer Ready exchange version to wait for before trying to merge exchanges.
+ */
+ private void mergeExchangeWaitVersion(Ignite node, long topVer) {
+ ((IgniteKernal)node).context().cache().context().exchange().mergeExchangesTestWaitVersion(
+ new AffinityTopologyVersion(topVer, 0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkCaches() throws Exception {
+ checkAffinity();
+
+ checkCaches0();
+
+ checkAffinity();
+
+ awaitPartitionMapExchange();
+
+ checkCaches0();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkCaches0() throws Exception {
+ List<Ignite> nodes = G.allGrids();
+
+ assertTrue(nodes.size() > 0);
+
+ for (Ignite node : nodes)
+ checkNodeCaches(node);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkAffinity() throws Exception {
+ List<Ignite> nodes = G.allGrids();
+
+ ClusterNode crdNode = null;
+
+ for (Ignite node : nodes) {
+ ClusterNode locNode = node.cluster().localNode();
+
+ if (crdNode == null || locNode.order() < crdNode.order())
+ crdNode = locNode;
+ }
+
+ AffinityTopologyVersion topVer = ((IgniteKernal)grid(crdNode)).
+ context().cache().context().exchange().readyAffinityVersion();
+
+ Map<String, List<List<ClusterNode>>> affMap = new HashMap<>();
+
+ for (Ignite node : nodes) {
+ IgniteKernal node0 = (IgniteKernal)node;
+
+ for (IgniteInternalCache cache : node0.context().cache().caches()) {
+ List<List<ClusterNode>> aff = affMap.get(cache.name());
+ List<List<ClusterNode>> aff0 = cache.context().affinity().assignments(topVer);
+
+ if (aff != null)
+ assertEquals(aff, aff0);
+ else
+ affMap.put(cache.name(), aff0);
+ }
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @param startKey Start key.
+ * @param keyRange Keys range.
+ */
+ private void checkNodeCaches(Ignite node, int startKey, int keyRange) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (String cacheName : cacheNames) {
+ String err = "Invalid value [node=" + node.name() +
+ ", client=" + node.configuration().isClientMode() +
+ ", order=" + node.cluster().localNode().order() +
+ ", cache=" + cacheName + ']';
+
+ IgniteCache<Object, Object> cache = node.cache(cacheName);
+
+ for (int i = 0; i < 10; i++) {
+ Integer key = rnd.nextInt(keyRange) + startKey;
+
+ cache.put(key, i);
+
+ Object val = cache.get(key);
+
+ assertEquals(err, i, val);
+ }
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @throws Exception If failed.
+ */
+ private void checkNodeCaches(final Ignite node) throws Exception {
+ log.info("Check node caches [node=" + node.name() + ']');
+
+ List<Future<?>> futs = new ArrayList<>();
+
+ for (final String cacheName : cacheNames) {
+ final IgniteCache<Object, Object> cache = node.cache(cacheName);
+
+ futs.add(executor.submit(new Runnable() {
+ @Override public void run() {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ assertNotNull("No cache [node=" + node.name() +
+ ", client=" + node.configuration().isClientMode() +
+ ", order=" + node.cluster().localNode().order() +
+ ", cache=" + cacheName + ']', cache);
+
+ String err = "Invalid value [node=" + node.name() +
+ ", client=" + node.configuration().isClientMode() +
+ ", order=" + node.cluster().localNode().order() +
+ ", cache=" + cacheName + ']';
+
+ for (int i = 0; i < 5; i++) {
+ Integer key = rnd.nextInt(20_000);
+
+ cache.put(key, i);
+
+ Object val = cache.get(key);
+
+ assertEquals(err, i, val);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ Map<Integer, Integer> map = new TreeMap<>();
+
+ for (int j = 0; j < 10; j++) {
+ Integer key = rnd.nextInt(20_000);
+
+ map.put(key, i);
+ }
+
+ cache.putAll(map);
+
+ Map<Object, Object> res = cache.getAll(map.keySet());
+
+ for (Map.Entry<Integer, Integer> e : map.entrySet())
+ assertEquals(err, e.getValue(), res.get(e.getKey()));
+ }
+
+ if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) {
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation : TransactionIsolation.values())
+ checkNodeCaches(err, node, cache, concurrency, isolation);
+ }
+ }
+ }
+ }));
+ }
+
+ for (Future<?> fut : futs)
+ fut.get();
+ }
+
+ /**
+ * @param err Error message.
+ * @param node Node.
+ * @param cache Cache.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ */
+ private void checkNodeCaches(
+ String err,
+ Ignite node,
+ IgniteCache<Object, Object> cache,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ Map<Object, Object> map = new HashMap<>();
+
+ try {
+ try (Transaction tx = node.transactions().txStart(concurrency, isolation)) {
+ for (int i = 0; i < 5; i++) {
+ Integer key = rnd.nextInt(20_000);
+
+ cache.put(key, i);
+
+ Object val = cache.get(key);
+
+ assertEquals(i, val);
+
+ map.put(key, val);
+ }
+
+ tx.commit();
+ }
+ }
+ catch (ClusterTopologyException ignore) {
+ // No-op.
+ }
+
+ for (Map.Entry<Object, Object> e : map.entrySet())
+ assertEquals(err, e.getValue(), cache.get(e.getKey()));
+ }
+
+ /**
+ * @param node Node.
+ * @param vers Expected exchange versions.
+ */
+ private void checkExchanges(Ignite node, long... vers) {
+ IgniteKernal node0 = (IgniteKernal)node;
+
+ List<AffinityTopologyVersion> expVers = new ArrayList<>();
+
+ for (long ver : vers)
+ expVers.add(new AffinityTopologyVersion(ver));
+
+ List<AffinityTopologyVersion> doneVers = new ArrayList<>();
+
+ List<GridDhtPartitionsExchangeFuture> futs =
+ node0.context().cache().context().exchange().exchangeFutures();
+
+ for (int i = futs.size() - 1; i >= 0; i--) {
+ GridDhtPartitionsExchangeFuture fut = futs.get(i);
+
+ if (fut.exchangeDone() && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT) {
+ AffinityTopologyVersion resVer = fut.topologyVersion();
+
+ if (resVer != null)
+ doneVers.add(resVer);
+ }
+ }
+
+ assertEquals(expVers, doneVers);
+
+ for (CacheGroupContext grpCtx : node0.context().cache().cacheGroups()) {
+ for (AffinityTopologyVersion ver : grpCtx.affinity().cachedVersions()) {
+ if (ver.minorTopologyVersion() > 0)
+ continue;
+
+ assertTrue("Unexpected version [ver=" + ver + ", exp=" + expVers + ']',
+ expVers.contains(ver));
+ }
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @param topVer Exchange version.
+ * @throws Exception If failed.
+ */
+ private void waitForExchangeStart(final Ignite node, final long topVer) throws Exception {
+ final GridCachePartitionExchangeManager exch = ((IgniteKernal)node).context().cache().context().exchange();
+
+ boolean wait = GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return exch.lastTopologyFuture().initialVersion().topologyVersion() >= topVer;
+ }
+ }, 15_000);
+
+ assertTrue(wait);
+ }
+
+ /**
+ * Sequentially starts nodes so that node name is consistent with node order.
+ *
+ * @param node Some existing node.
+ * @param startIdx Start node index.
+ * @param cnt Number of nodes.
+ * @return Start future.
+ * @throws Exception If failed.
+ */
+ private IgniteInternalFuture startGridsAsync(Ignite node, int startIdx, int cnt) throws Exception {
+ GridCompoundFuture fut = new GridCompoundFuture();
+
+ for (int i = 0; i < cnt; i++) {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ node.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ log.info("Got event: " + ((DiscoveryEvent)evt).eventNode().id());
+
+ latch.countDown();
+
+ return false;
+ }
+ }, EventType.EVT_NODE_JOINED);
+
+ final int nodeIdx = startIdx + i;
+
+ IgniteInternalFuture fut0 = GridTestUtils.runAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ log.info("Start new node: " + nodeIdx);
+
+ startGrid(nodeIdx);
+
+ return null;
+ }
+ }, "start-node-" + nodeIdx);
+
+ if (!latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
+ fail();
+
+ fut.add(fut0);
+ }
+
+ fut.markInitialized();
+
+ return fut;
+ }
+
+ /**
+ *
+ */
+ enum CoordinatorChangeMode {
+ /**
+ * Coordinator failed, did not send full message.
+ */
+ NOBODY_RCVD,
+
+ /**
+ * Coordinator failed, but new coordinator received full message
+ * and finished exchange.
+ */
+ NEW_CRD_RCDV,
+
+ /**
+ * Coordinator failed, but one of servers (not new coordinator) received full message.
+ */
+ NON_CRD_RCVD
+ }
+
+ /**
+ *
+ */
+
+ static class TestDelayExchangeMessagesSpi extends TestDelayingCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) {
+ if (msg instanceof GridDhtPartitionsAbstractMessage)
+ return ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null || (msg instanceof GridDhtPartitionsSingleRequest);
+
+ return false;
+ }
+ }
+}