You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/10/21 16:16:48 UTC
[03/50] [abbrv] ignite git commit: ignite-1183 Fixed data structures
create/destroy from client node
ignite-1183 Fixed data structures create/destroy from client node
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3036c8d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3036c8d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3036c8d8
Branch: refs/heads/ignite-1.4-slow-server-debug
Commit: 3036c8d85db223a26f48f273ad08b6ea87e2e2ba
Parents: f025714
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 8 16:33:53 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 8 16:33:53 2015 +0300
----------------------------------------------------------------------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +-
.../colocated/GridDhtColocatedLockFuture.java | 11 +-
.../distributed/near/GridNearLockFuture.java | 11 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 24 +--
.../datastructures/DataStructuresProcessor.java | 48 ++++--
.../CacheGetFutureHangsSelfTest.java | 3 +
...niteCacheClientNodeChangingTopologyTest.java | 6 +-
...gniteAtomicLongChangingTopologySelfTest.java | 155 +++++++++++++++++--
8 files changed, 216 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 41df53a..97aa646 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -585,8 +585,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (req != null) {
res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion());
- res.addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before " +
- "response is received: " + nodeId));
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+ "before response is received: " + nodeId);
+
+ e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+ res.addFailedKeys(req.keys(), e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 33a5cbd..be09f54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -598,7 +598,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
// Continue mapping on the same topology version as it was before.
this.topVer.compareAndSet(null, topVer);
- map(keys, false);
+ map(keys, false, true);
markInitialized();
@@ -654,7 +654,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
this.topVer.compareAndSet(null, topVer);
}
- map(keys, remap);
+ map(keys, remap, false);
if (c != null)
c.run();
@@ -691,8 +691,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
*
* @param keys Keys.
* @param remap Remap flag.
+ * @param topLocked {@code True} if thread already acquired lock preventing topology change.
*/
- private void map(Collection<KeyCacheObject> keys, boolean remap) {
+ private void map(Collection<KeyCacheObject> keys, boolean remap, boolean topLocked) {
try {
AffinityTopologyVersion topVer = this.topVer.get();
@@ -819,7 +820,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
boolean clientFirst = false;
if (first) {
- clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+ clientFirst = clientNode &&
+ !topLocked &&
+ (tx == null || !tx.hasRemoteLocks());
first = false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index dcc8da6..e6b1e02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -718,7 +718,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
// Continue mapping on the same topology version as it was before.
this.topVer.compareAndSet(null, topVer);
- map(keys, false);
+ map(keys, false, true);
markInitialized();
@@ -773,7 +773,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
this.topVer.compareAndSet(null, topVer);
}
- map(keys, remap);
+ map(keys, remap, false);
markInitialized();
}
@@ -807,8 +807,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
*
* @param keys Keys.
* @param remap Remap flag.
+ * @param topLocked {@code True} if thread already acquired lock preventing topology change.
*/
- private void map(Iterable<KeyCacheObject> keys, boolean remap) {
+ private void map(Iterable<KeyCacheObject> keys, boolean remap, boolean topLocked) {
try {
AffinityTopologyVersion topVer = this.topVer.get();
@@ -938,7 +939,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
boolean clientFirst = false;
if (first) {
- clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+ clientFirst = clientNode &&
+ !topLocked &&
+ (tx == null || !tx.hasRemoteLocks());
first = false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 25028c4..1fb33a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -271,7 +271,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
cctx.mvcc().addFuture(this);
- prepare0(false);
+ prepare0(false, true);
return;
}
@@ -338,7 +338,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
return;
}
- prepare0(remap);
+ prepare0(remap, false);
if (c != null)
c.run();
@@ -428,8 +428,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
* Initializes future.
*
* @param remap Remap flag.
+ * @param topLocked {@code True} if thread already acquired lock preventing topology change.
*/
- private void prepare0(boolean remap) {
+ private void prepare0(boolean remap, boolean topLocked) {
try {
boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
@@ -451,7 +452,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
prepare(
tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
- tx.writeEntries());
+ tx.writeEntries(),
+ topLocked);
markInitialized();
}
@@ -466,11 +468,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* @param reads Read entries.
* @param writes Write entries.
+ * @param topLocked {@code True} if thread already acquired lock preventing topology change.
* @throws IgniteCheckedException If failed.
*/
private void prepare(
Iterable<IgniteTxEntry> reads,
- Iterable<IgniteTxEntry> writes
+ Iterable<IgniteTxEntry> writes,
+ boolean topLocked
) throws IgniteCheckedException {
AffinityTopologyVersion topVer = tx.topologyVersion();
@@ -497,7 +501,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
GridDistributedTxMapping cur = null;
for (IgniteTxEntry read : reads) {
- GridDistributedTxMapping updated = map(read, topVer, cur, false);
+ GridDistributedTxMapping updated = map(read, topVer, cur, false, topLocked);
if (cur != updated) {
mappings.offer(updated);
@@ -514,7 +518,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
for (IgniteTxEntry write : writes) {
- GridDistributedTxMapping updated = map(write, topVer, cur, true);
+ GridDistributedTxMapping updated = map(write, topVer, cur, true, topLocked);
if (cur != updated) {
mappings.offer(updated);
@@ -647,13 +651,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
* @param topVer Topology version.
* @param cur Current mapping.
* @param waitLock Wait lock flag.
+ * @param topLocked {@code True} if thread already acquired lock preventing topology change.
* @return Mapping.
*/
private GridDistributedTxMapping map(
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
@Nullable GridDistributedTxMapping cur,
- boolean waitLock
+ boolean waitLock,
+ boolean topLocked
) {
GridCacheContext cacheCtx = entry.context();
@@ -685,7 +691,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
- boolean clientFirst = cur == null && cctx.kernalContext().clientNode();
+ boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode();
cur = new GridDistributedTxMapping(primary);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index ef2c543..7c5e97c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -505,14 +505,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return dataStructure;
}
- catch (ClusterTopologyCheckedException e) {
- IgniteInternalFuture<?> fut = e.retryReadyFuture();
-
- fut.get();
- }
catch (IgniteTxRollbackCheckedException ignore) {
// Safe to retry right away.
}
+ catch (IgniteCheckedException e) {
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+ if (topErr == null)
+ throw e;
+
+ IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+ if (fut != null)
+ fut.get();
+ }
}
}
@@ -593,14 +599,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (afterRmv != null && rmvInfo != null)
afterRmv.applyx(rmvInfo);
}
- catch (ClusterTopologyCheckedException e) {
- IgniteInternalFuture<?> fut = e.retryReadyFuture();
-
- fut.get();
- }
catch (IgniteTxRollbackCheckedException ignore) {
// Safe to retry right away.
}
+ catch (IgniteCheckedException e) {
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+ if (topErr == null)
+ throw e;
+
+ IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+ if (fut != null)
+ fut.get();
+ }
}
}
@@ -995,14 +1007,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return col;
}
- catch (ClusterTopologyCheckedException e) {
- IgniteInternalFuture<?> fut = e.retryReadyFuture();
-
- fut.get();
- }
catch (IgniteTxRollbackCheckedException ignore) {
// Safe to retry right away.
}
+ catch (IgniteCheckedException e) {
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+ if (topErr == null)
+ throw e;
+
+ IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+ if (fut != null)
+ fut.get();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
index e8622aa..51e76f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -54,6 +55,8 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
OptimizedMarshaller marsh = new OptimizedMarshaller();
marsh.setRequireSerializable(false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 2d29c49..1b3dc7a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -118,7 +118,11 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
cfg.setClientMode(client);
- cfg.setCommunicationSpi(new TestCommunicationSpi());
+ TestCommunicationSpi commSpi = new TestCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
cfg.setCacheConfiguration(ccfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
index 337334e..32a86e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -17,21 +17,37 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteAtomicLong;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.configuration.AtomicConfiguration;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
/**
*
@@ -52,6 +68,9 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ /** */
+ private boolean client;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,16 +79,18 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
discoSpi.setIpFinder(IP_FINDER);
- cfg.setDiscoverySpi(discoSpi);
+ cfg.setDiscoverySpi(discoSpi).setNetworkTimeout(30_000);
AtomicConfiguration atomicCfg = new AtomicConfiguration();
- atomicCfg.setCacheMode(CacheMode.PARTITIONED);
+ atomicCfg.setCacheMode(PARTITIONED);
atomicCfg.setBackups(1);
cfg.setAtomicConfiguration(atomicCfg);
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+ cfg.setClientMode(client);
+
return cfg;
}
@@ -111,6 +132,110 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
/**
* @throws Exception If failed.
*/
+ public void testClientAtomicLongCreateCloseFailover() throws Exception {
+ testFailoverWithClient(new IgniteInClosure<Ignite>() {
+ @Override public void apply(Ignite ignite) {
+ for (int i = 0; i < 100; i++) {
+ IgniteAtomicLong l = ignite.atomicLong("long-" + 1, 0, true);
+
+ l.close();
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientQueueCreateCloseFailover() throws Exception {
+ testFailoverWithClient(new IgniteInClosure<Ignite>() {
+ @Override public void apply(Ignite ignite) {
+ for (int i = 0; i < 100; i++) {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setBackups(1);
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(i % 2 == 0 ? TRANSACTIONAL : ATOMIC);
+
+ IgniteQueue q = ignite.queue("q-" + i, 0, colCfg);
+
+ q.close();
+ }
+ }
+ });
+ }
+
+ /**
+ * @param c Test iteration closure.
+ * @throws Exception If failed.
+ */
+ private void testFailoverWithClient(IgniteInClosure<Ignite> c) throws Exception {
+ startGridsMultiThreaded(GRID_CNT, false);
+
+ client = true;
+
+ Ignite ignite = startGrid(GRID_CNT);
+
+ assertTrue(ignite.configuration().isClientMode());
+
+ client = false;
+
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = restartThread(finished);
+
+ long stop = System.currentTimeMillis() + 30_000;
+
+ try {
+ int iter = 0;
+
+ while (System.currentTimeMillis() < stop) {
+ log.info("Iteration: " + iter++);
+
+ c.apply(ignite);
+ }
+
+ finished.set(true);
+
+ fut.get();
+ }
+ finally {
+ finished.set(true);
+ }
+ }
+
+ /**
+ * @param finished Finished flag.
+ * @return Future.
+ */
+ private IgniteInternalFuture<?> restartThread(final AtomicBoolean finished) {
+ return GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (!finished.get()) {
+ for (int i = 0; i < GRID_CNT; i++) {
+ log.info("Stop node: " + i);
+
+ stopGrid(i);
+
+ U.sleep(500);
+
+ log.info("Start node: " + i);
+
+ startGrid(i);
+
+ if (finished.get())
+ break;
+ }
+ }
+
+ return null;
+ }
+ }, "restart-thread");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testIncrementConsistency() throws Exception {
startGrids(GRID_CNT);