You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/05/27 18:13:55 UTC
[36/50] [abbrv] incubator-ignite git commit: # ignite-23
# ignite-23
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/32cb360f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/32cb360f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/32cb360f
Branch: refs/heads/ignite-943
Commit: 32cb360f736cdc84af8d461ad4c49cc2806cf996
Parents: 6432ec0
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 26 15:49:12 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 26 17:46:53 2015 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 2 +-
.../GridDhtPartitionsExchangeFuture.java | 2 -
...niteCacheClientNodeChangingTopologyTest.java | 431 ++++++++++++++++---
3 files changed, 380 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32cb360f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9135f0a..69f5501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1062,7 +1062,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (ver == null) {
// Assign next version for update inside entries lock.
- ver = ctx.versions().next(req.topologyVersion());
+ ver = ctx.versions().next(topology().topologyVersion());
if (hasNear)
res.nearVersion(ver);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32cb360f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e04432f..1c1ebd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -514,8 +514,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (exchId.isLeft())
cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
- cctx.mvcc().finishLocks(exchId.topologyVersion()).get();
-
onDone(exchId.topologyVersion());
skipPreload = true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32cb360f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index b067797..45fa275 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -211,7 +211,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
ignite3.close();
@@ -248,7 +248,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
for (int i = 0; i < KEYS; i++)
map.put(i, i + 2);
@@ -258,7 +258,106 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
else
cache.put(0, 2);
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicNoRemapClockMode() throws Exception {
+ atomicNoRemap(CLOCK);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicNoRemapPrimaryMode() throws Exception {
+ atomicNoRemap(PRIMARY);
+ }
+
+ /**
+ * @param writeOrder Write order.
+ * @throws Exception If failed.
+ */
+ private void atomicNoRemap(CacheAtomicWriteOrderMode writeOrder) throws Exception {
+ ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(writeOrder);
+ ccfg.setRebalanceMode(SYNC);
+
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+ IgniteEx ignite2 = startGrid(2);
+
+ client = true;
+
+ Ignite ignite3 = startGrid(3);
+
+ assertTrue(ignite3.configuration().isClientMode());
+
+ final Map<Integer, Integer> map = new HashMap<>();
+
+ map.put(primaryKey(ignite0.cache(null)), 0);
+ map.put(primaryKey(ignite1.cache(null)), 1);
+ map.put(primaryKey(ignite2.cache(null)), 2);
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
+
+ // Block messages requests for both nodes.
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite2.localNode().id());
+
+ spi.record(GridNearAtomicUpdateRequest.class);
+
+ final IgniteCache<Integer, Integer> cache = ignite3.cache(null);
+
+ assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
+
+ IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Thread.currentThread().setName("put-thread");
+
+ cache.putAll(map);
+
+ return null;
+ }
+ });
+
+ IgniteEx ignite4 = startGrid(4);
+
+ assertTrue(ignite4.configuration().isClientMode());
+
+ assertFalse(putFut.isDone());
+
+ log.info("Stop block.");
+
+ spi.stopBlock();
+
+ putFut.get();
+
+ spi.record(null);
+
+ checkData(map, null, cache, 5);
+
+ List<Object> msgs = spi.recordedMessages();
+
+ assertEquals(3, msgs.size());
+
+ for (Object msg : msgs)
+ assertTrue(((GridNearAtomicUpdateRequest)msg).clientRequest());
+
+ map.put(primaryKey(ignite0.cache(null)), 3);
+ map.put(primaryKey(ignite1.cache(null)), 4);
+ map.put(primaryKey(ignite2.cache(null)), 5);
+
+ cache.putAll(map);
+
+ checkData(map, null, cache, 5);
}
/**
@@ -334,7 +433,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
Integer old = putFut.get();
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
assertEquals((Object)0, old);
}
@@ -394,7 +493,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
map.clear();
@@ -403,7 +502,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
cache.putAll(map);
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
}
/**
* @throws Exception If failed.
@@ -452,6 +551,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id());
spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id());
+ spi.record(GridNearLockRequest.class);
+
final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() {
@@ -480,7 +581,17 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, cache, 4);
+ spi.record(null);
+
+ checkData(map, null, cache, 4);
+
+ List<Object> msgs = spi.recordedMessages();
+
+ assertTrue(((GridNearLockRequest)msgs.get(0)).firstClientRequest());
+ assertTrue(((GridNearLockRequest)msgs.get(1)).firstClientRequest());
+
+ for (int i = 2; i < msgs.size(); i++)
+ assertFalse(((GridNearLockRequest)msgs.get(i)).firstClientRequest());
ignite3.close();
@@ -513,7 +624,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
for (int i = 0; i < 100; i++)
map.put(i, i + 2);
@@ -524,7 +635,192 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
tx.commit();
}
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
+ }
+
+ /**
+ * Tests specific scenario when mapping for first locked keys does not change, but changes for second one.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPessimisticTx2() throws Exception {
+ ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(SYNC);
+
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+ IgniteEx ignite2 = startGrid(2);
+
+ client = true;
+
+ final Ignite ignite3 = startGrid(3);
+
+ assertTrue(ignite3.configuration().isClientMode());
+
+ AffinityTopologyVersion topVer1 = new AffinityTopologyVersion(4, 0);
+
+ assertEquals(topVer1, ignite0.context().cache().internalCache(null).context().topology().topologyVersion());
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
+
+ final Integer key1 = 0;
+ final Integer key2 = 7;
+
+ spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearLockRequest.class, ignite2.localNode().id());
+
+ final IgniteCache<Integer, Integer> cache = ignite3.cache(null);
+
+ IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Thread.currentThread().setName("put-thread");
+
+ try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key1, 1);
+ cache.put(key2, 2);
+
+ tx.commit();
+ }
+
+ return null;
+ }
+ });
+
+ client = false;
+
+ IgniteEx ignite4 = startGrid(4);
+
+ AffinityTopologyVersion topVer2 = new AffinityTopologyVersion(5, 0);
+
+ assertEquals(topVer2, ignite0.context().cache().internalCache(null).context().topology().topologyVersion());
+
+ GridCacheAffinityManager aff = ignite0.context().cache().internalCache(null).context().affinity();
+
+ List<ClusterNode> nodes1 = aff.nodes(key1, topVer1);
+ List<ClusterNode> nodes2 = aff.nodes(key1, topVer2);
+
+ assertEquals(nodes1, nodes2);
+
+ nodes1 = aff.nodes(key2, topVer1);
+ nodes2 = aff.nodes(key2, topVer2);
+
+ assertFalse(nodes1.get(0).equals(nodes2.get(0)));
+
+ assertFalse(putFut.isDone());
+
+ log.info("Stop block.");
+
+ spi.stopBlock();
+
+ putFut.get();
+
+ checkData(F.asMap(key1, 1, key2, 2), null, cache, 5);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticTxNearEnabledNoRemap() throws Exception {
+ pessimisticTxNoRemap(new NearCacheConfiguration());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticTxNoRemap() throws Exception {
+ pessimisticTxNoRemap(null);
+ }
+
+ /**
+ * @param nearCfg Near cache configuration.
+ * @throws Exception If failed.
+ */
+ private void pessimisticTxNoRemap(@Nullable NearCacheConfiguration nearCfg) throws Exception {
+ ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(SYNC);
+ ccfg.setNearConfiguration(nearCfg);
+
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+ IgniteEx ignite2 = startGrid(2);
+
+ client = true;
+
+ final Ignite ignite3 = startGrid(3);
+
+ assertTrue(ignite3.configuration().isClientMode());
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
+
+ final Map<Integer, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < 100; i++)
+ map.put(i, i);
+
+ spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearLockRequest.class, ignite2.localNode().id());
+
+ spi.record(GridNearLockRequest.class);
+
+ final IgniteCache<Integer, Integer> cache = ignite3.cache(null);
+
+ IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Thread.currentThread().setName("put-thread");
+
+ try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (Map.Entry<Integer, Integer> e : map.entrySet())
+ cache.put(e.getKey(), e.getValue());
+
+ tx.commit();
+ }
+
+ return null;
+ }
+ });
+
+ IgniteEx ignite4 = startGrid(4);
+
+ assertTrue(ignite4.configuration().isClientMode());
+
+ assertFalse(putFut.isDone());
+
+ log.info("Stop block.");
+
+ spi.stopBlock();
+
+ putFut.get();
+
+ spi.record(null);
+
+ checkData(map, null, cache, 5);
+
+ List<Object> msgs = spi.recordedMessages();
+
+ checkClientLockMessages(msgs, map.size());
+
+ for (int i = 0; i < 100; i++)
+ map.put(i, i + 1);
+
+ try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.putAll(map);
+
+ tx.commit();
+ }
+
+ checkData(map, null, cache, 5);
}
/**
@@ -775,7 +1071,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
checkClientPrepareMessages(spi.recordedMessages(), 6);
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
cache.putAll(map);
@@ -783,7 +1079,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
spi.record(null);
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(null);
@@ -802,7 +1098,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
for (Object msg : msgs)
assertFalse(((GridNearTxPrepareRequest)msg).firstClientRequest());
- checkData(map, cache, 4);
+ checkData(map, null, cache, 4);
}
/**
@@ -944,11 +1240,15 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
/**
* @param map Expected data.
+ * @param keys Expected keys (if expected data is not specified).
* @param clientCache Client cache.
* @param expNodes Expected nodes number.
* @throws Exception If failed.
*/
- private void checkData(final Map<Integer, Integer> map, IgniteCache<?, ?> clientCache, final int expNodes)
+ private void checkData(final Map<Integer, Integer> map,
+ final Set<Integer> keys,
+ IgniteCache<?, ?> clientCache,
+ final int expNodes)
throws Exception
{
final List<Ignite> nodes = G.allGrids();
@@ -964,18 +1264,26 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
boolean wait = GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
try {
- for (Map.Entry<Integer, Integer> e : map.entrySet()) {
- Integer key = e.getKey();
+ Set<Integer> keys0 = map != null ? map.keySet() : keys;
+ assertNotNull(keys0);
+
+ for (Integer key : keys0) {
GridCacheVersion ver = null;
+ Object val = null;
for (Ignite node : nodes) {
IgniteCache<Integer, Integer> cache = node.cache(null);
boolean affNode = aff.isPrimaryOrBackup(node.cluster().localNode(), key);
+ Object val0 = cache.localPeek(key);
+
if (affNode || node == nearCacheNode) {
- assertEquals("Unexpected value for " + node.name(), e.getValue(), cache.localPeek(key));
+ if (map != null)
+ assertEquals("Unexpected value for " + node.name(), map.get(key), val0);
+ else
+ assertNotNull("Unexpected value for " + node.name(), val0);
GridCacheAdapter cache0 = ((IgniteKernal)node).internalCache(null);
@@ -991,13 +1299,28 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
assertNotNull("Null version [node=" + node.name() + ", key=" + key + ']', ver0);
- if (ver == null)
+ if (ver == null) {
ver = ver0;
- else
- assertEquals(ver0, ver);
+ val = val0;
+ }
+ else {
+ assertEquals("Version check failed [node=" + node.name() +
+ ", key=" + key +
+ ", affNode=" + affNode +
+ ", primary=" + aff.isPrimary(node.cluster().localNode(), key) + ']',
+ ver0,
+ ver);
+
+ assertEquals("Value check failed [node=" + node.name() +
+ ", key=" + key +
+ ", affNode=" + affNode +
+ ", primary=" + aff.isPrimary(node.cluster().localNode(), key) + ']',
+ val0,
+ val);
+ }
}
else
- assertNull("Unexpected non-null value for " + node.name(), cache.localPeek(key));
+ assertNull("Unexpected non-null value for " + node.name(), val0);
}
}
}
@@ -1080,10 +1403,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
clients.add(ignite);
}
- client = false;
-
- List<IgniteInternalFuture<?>> futs = new ArrayList<>();
-
final AtomicBoolean stop = new AtomicBoolean();
final AtomicInteger threadIdx = new AtomicInteger(0);
@@ -1092,8 +1411,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
final ConcurrentHashSet<Integer> putKeys = new ConcurrentHashSet<>();
+ IgniteInternalFuture<?> fut;
+
try {
- GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
int clientIdx = threadIdx.getAndIncrement() % CLIENT_CNT;
@@ -1122,7 +1443,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
for (int i = 0; i < 100; i++) {
Integer key = rnd.nextInt(0, 1000);
- map.put(key, key);
+ map.put(key, rnd.nextInt());
}
try {
@@ -1157,28 +1478,31 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
}
}, THREADS, "update-thread");
- for (final Ignite ignite : clients) {
+ long stopTime = System.currentTimeMillis() + 60_000;
- futs.add(GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Thread.currentThread().setName("update-" + ignite.name());
+ while (System.currentTimeMillis() < stopTime) {
+ boolean restartClient = ThreadLocalRandom.current().nextBoolean();
- log.info("Start updates from node: " + ignite.name());
+ Integer idx = null;
+ if (restartClient) {
+ log.info("Start client node.");
- return null;
- }
- }));
- }
+ client = true;
- long stopTime = System.currentTimeMillis() + 60_000;
+ IgniteEx ignite = startGrid(SRV_CNT + CLIENT_CNT);
- while (System.currentTimeMillis() < stopTime) {
- int idx = ThreadLocalRandom.current().nextInt(0, SRV_CNT);
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
- log.info("Stop node: " + idx);
+ assertNotNull(cache);
+ }
+ else {
+ idx = ThreadLocalRandom.current().nextInt(0, SRV_CNT);
+
+ log.info("Stop server node: " + idx);
- stopGrid(idx);
+ stopGrid(idx);
+ }
updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() {
@Override public void run() {
@@ -1190,7 +1514,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
updateBarrier.await(30_000, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
- log.info("Failed to wait for update.");
+ log.error("Failed to wait for update.");
U.dumpThreads(log);
@@ -1204,9 +1528,18 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
U.sleep(500);
- log.info("Start node: " + idx);
+ if (restartClient) {
+ log.info("Stop client node.");
- startGrid(idx);
+ stopGrid(SRV_CNT + CLIENT_CNT);
+ }
+ else {
+ log.info("Start server node: " + idx);
+
+ client = false;
+
+ startGrid(idx);
+ }
updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() {
@Override public void run() {
@@ -1218,7 +1551,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
updateBarrier.await(30_000, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
- log.info("Failed to wait for update.");
+ log.error("Failed to wait for update.");
U.dumpThreads(log);
@@ -1237,15 +1570,9 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
stop.set(true);
}
- for (IgniteInternalFuture<?> fut : futs)
- fut.get();
-
- Map<Integer, Integer> map = new HashMap<>();
-
- for (Integer key : putKeys)
- map.put(key, key);
+ fut.get(30_000);
- checkData(map, grid(SRV_CNT).cache(null), SRV_CNT + CLIENT_CNT);
+ checkData(null, putKeys, grid(SRV_CNT).cache(null), SRV_CNT + CLIENT_CNT);
}
/**