You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/03 03:03:45 UTC
[36/50] [abbrv] ignite git commit: IGNITE-264 - Fixed affinity
topology version handling for nested internal transaction.
IGNITE-264 - Fixed affinity topology version handling for nested internal transaction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d7602975
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d7602975
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d7602975
Branch: refs/heads/ignite-264
Commit: d7602975835ea790a3a03736ce62d0b52c064097
Parents: cc98802
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Aug 26 17:55:53 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Aug 26 17:55:53 2015 -0700
----------------------------------------------------------------------
.../processors/cache/GridCacheContext.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 2 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +-
.../colocated/GridDhtColocatedLockFuture.java | 8 +
.../distributed/near/GridNearLockFuture.java | 8 +
.../near/GridNearOptimisticTxPrepareFuture.java | 8 +
.../cache/transactions/IgniteTxAdapter.java | 1 +
.../cache/transactions/IgniteTxManager.java | 4 +-
.../datastructures/DataStructuresProcessor.java | 100 ++++---
...gniteAtomicLongChangingTopologySelfTest.java | 283 +++++++++++++++++++
.../IgniteCacheFailoverTestSuite.java | 1 +
11 files changed, 377 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 5f17746..e1a38c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -440,7 +440,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return {@code True} if should use system transactions which are isolated from user transactions.
*/
public boolean systemTx() {
- return cacheType == CacheType.UTILITY;
+ return cacheType == CacheType.UTILITY || (cacheType == CacheType.INTERNAL && transactional());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0ef190e..55e133e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -603,7 +603,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (!cctx.discovery().alive(node.id()) || !cctx.discovery().pingNode(node.id()))
throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
- if (cnt == retryCnt)
+ if (cnt == retryCnt || cctx.kernalContext().isStopping())
throw e;
else if (log.isDebugEnabled())
log.debug("Failed to send message to node (will retry): " + node.id());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 07ec808..f6bb315 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -320,7 +320,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
public void map() {
AffinityTopologyVersion topVer = null;
- IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
+ IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(null);
if (tx != null && tx.topologyVersionSnapshot() != null)
topVer = tx.topologyVersionSnapshot();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 90ca8df..7a2e717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -518,6 +518,14 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
// Obtain the topology version to use.
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
+ // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+ if (topVer == null && tx != null && tx.system()) {
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+ if (tx0 != null)
+ topVer = tx0.topologyVersionSnapshot();
+ }
+
if (topVer != null && tx != null)
tx.topologyVersion(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 2815194..daec1ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -657,6 +657,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
// Obtain the topology version to use.
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+ // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+ if (topVer == null && tx != null && tx.system()) {
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+ if (tx0 != null)
+ topVer = tx0.topologyVersionSnapshot();
+ }
+
if (topVer != null && tx != null)
tx.topologyVersion(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e51dcb0..4111c41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -234,6 +234,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
// Obtain the topology version to use.
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+ // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+ if (topVer == null && tx != null && tx.system()) {
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+ if (tx0 != null)
+ topVer = tx0.topologyVersionSnapshot();
+ }
+
if (topVer != null) {
tx.topologyVersion(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index f9b2437..907a251 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -184,6 +184,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>();
/** Topology version. */
+ @GridToStringInclude
protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
/** Mutex. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4554c6f..c96edd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -646,7 +646,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/**
* @return Any transaction associated with the current thread.
*/
- public IgniteInternalTx anyActiveThreadTx() {
+ public IgniteInternalTx anyActiveThreadTx(IgniteInternalTx ignore) {
long threadId = Thread.currentThread().getId();
IgniteInternalTx tx = threadMap.get(threadId);
@@ -660,7 +660,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
- if (tx != null && tx.topologyVersionSnapshot() != null)
+ if (tx != null && tx != ignore && tx.topologyVersionSnapshot() != null)
return tx;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 57b16f0..87c5208 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -445,18 +445,28 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (!create)
return c.applyx();
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+ while (true) {
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
- if (err != null)
- throw err;
+ if (err != null)
+ throw err;
- dataStructure = c.applyx();
+ dataStructure = c.applyx();
- tx.commit();
- }
+ tx.commit();
+
+ return dataStructure;
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteInternalFuture<?> fut = e.retryReadyFuture();
- return dataStructure;
+ fut.get();
+ }
+ catch (IgniteTxRollbackCheckedException ignore) {
+ // Safe to retry right away.
+ }
+ }
}
/**
@@ -512,31 +522,39 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (err != null)
throw err;
- T rmvInfo;
+ while (true) {
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ T2<Boolean, IgniteCheckedException> res =
+ utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- T2<Boolean, IgniteCheckedException> res =
- utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
+ err = res.get2();
- err = res.get2();
+ if (err != null)
+ throw err;
- if (err != null)
- throw err;
+ assert res.get1() != null;
- assert res.get1() != null;
+ boolean exists = res.get1();
- boolean exists = res.get1();
+ if (!exists)
+ return;
- if (!exists)
- return;
+ T rmvInfo = c.applyx();
- rmvInfo = c.applyx();
+ tx.commit();
- tx.commit();
- }
+ if (afterRmv != null && rmvInfo != null)
+ afterRmv.applyx(rmvInfo);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteInternalFuture<?> fut = e.retryReadyFuture();
- if (afterRmv != null && rmvInfo != null)
- afterRmv.applyx(rmvInfo);
+ fut.get();
+ }
+ catch (IgniteTxRollbackCheckedException ignore) {
+ // Safe to retry right away.
+ }
+ }
}
/**
@@ -906,27 +924,35 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return c.applyx(cacheCtx);
}
- T col;
+ while (true) {
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ T2<String, IgniteCheckedException> res =
+ utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- T2<String, IgniteCheckedException> res =
- utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
+ err = res.get2();
- err = res.get2();
+ if (err != null)
+ throw err;
- if (err != null)
- throw err;
+ String cacheName = res.get1();
- String cacheName = res.get1();
+ final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
- final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
+ T col = c.applyx(cacheCtx);
- col = c.applyx(cacheCtx);
+ tx.commit();
- tx.commit();
- }
+ return col;
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteInternalFuture<?> fut = e.retryReadyFuture();
- return col;
+ fut.get();
+ }
+ catch (IgniteTxRollbackCheckedException ignore) {
+ // Safe to retry right away.
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
new file mode 100644
index 0000000..cee54b8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstractTest {
+ /**
+ * Grid count.
+ */
+ private static final int GRID_CNT = 5;
+
+ /**
+ * Restart count.
+ */
+ private static final int RESTART_CNT = 15;
+
+ /**
+ * Atomic long name.
+ */
+ private static final String ATOMIC_LONG_NAME = "test-atomic-long";
+
+ /**
+ * Queue.
+ */
+ private final Queue<Long> queue = new ConcurrentLinkedQueue<>();
+
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ AtomicConfiguration atomicCfg = new AtomicConfiguration();
+ atomicCfg.setCacheMode(CacheMode.PARTITIONED);
+ atomicCfg.setBackups(1);
+
+ cfg.setAtomicConfiguration(atomicCfg);
+
+ return cfg;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ queue.clear();
+ }
+
+ /**
+ *
+ */
+ public void testQueueCreateNodesJoin() throws Exception {
+ CountDownLatch startLatch = new CountDownLatch(GRID_CNT);
+ final AtomicBoolean run = new AtomicBoolean(true);
+
+ Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < GRID_CNT; i++)
+ futs.add(startNodeAndCreaterThread(i, startLatch, run));
+
+ startLatch.await();
+
+ info("All nodes started.");
+
+ Thread.sleep(10_000);
+
+ run.set(false);
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get();
+
+ info("Increments: " + queue.size());
+
+ assert !queue.isEmpty();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIncrementConsistency() throws Exception {
+ startGrids(GRID_CNT);
+
+ final AtomicBoolean run = new AtomicBoolean(true);
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception {
+ IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME, 0, true);
+
+ while (run.get())
+ queue.add(cntr.getAndIncrement());
+
+ return null;
+ }
+ }, 4, "increment-runner");
+
+ for (int i = 0; i < RESTART_CNT; i++) {
+ int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1;
+
+ stopGrid(restartIdx);
+
+ U.sleep(500);
+
+ startGrid(restartIdx);
+ }
+
+ run.set(false);
+
+ fut.get();
+
+ info("Increments: " + queue.size());
+
+ checkQueue();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueClose() throws Exception {
+ startGrids(GRID_CNT);
+
+ int threads = 4;
+
+ final AtomicBoolean run = new AtomicBoolean(true);
+ final AtomicInteger idx = new AtomicInteger();
+ final AtomicReferenceArray<Exception> arr = new AtomicReferenceArray<>(threads);
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception {
+ int base = idx.getAndIncrement();
+
+ try {
+ int delta = 0;
+
+ while (run.get()) {
+ IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME + "-" + base + "-" + delta, 0, true);
+
+ for (int i = 0; i < 5; i++)
+ queue.add(cntr.getAndIncrement());
+
+ cntr.close();
+
+ delta++;
+ }
+ }
+ catch (Exception e) {
+ arr.set(base, e);
+
+ throw e;
+ }
+ finally {
+ info("RUNNER THREAD IS STOPPING");
+ }
+
+ return null;
+ }
+ }, threads, "increment-runner");
+
+ for (int i = 0; i < RESTART_CNT; i++) {
+ int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1;
+
+ stopGrid(restartIdx);
+
+ U.sleep(500);
+
+ startGrid(restartIdx);
+ }
+
+ run.set(false);
+
+ fut.get();
+
+ for (int i = 0; i < threads; i++) {
+ Exception err = arr.get(i);
+
+ if (err != null)
+ throw err;
+ }
+ }
+
+ /**
+ *
+ */
+ private void checkQueue() {
+ List<Long> list = new ArrayList<>(queue);
+
+ Collections.sort(list);
+
+ boolean failed = false;
+
+ int delta = 0;
+
+ for (int i = 0; i < list.size(); i++) {
+ Long exp = (long)(i + delta);
+
+ Long actual = list.get(i);
+
+ if (!exp.equals(actual)) {
+ failed = true;
+
+ delta++;
+
+ info(">>> Expected " + exp + ", actual " + actual);
+ }
+ }
+
+ assertFalse(failed);
+ }
+
+ /**
+ * @param i Node index.
+ */
+ private IgniteInternalFuture<?> startNodeAndCreaterThread(final int i, final CountDownLatch startLatch, final AtomicBoolean run)
+ throws Exception {
+ return multithreadedAsync(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Ignite ignite = startGrid(i);
+
+ startLatch.countDown();
+
+ while (run.get()) {
+ IgniteAtomicLong cntr = ignite.atomicLong(ATOMIC_LONG_NAME, 0, true);
+
+ queue.add(cntr.getAndIncrement());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, 1, "grunner-" + i);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index b64471b..42e0f6f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -81,6 +81,7 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheSizeFailoverTest.class);
suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class);
+ suite.addTestSuite(IgniteAtomicLongChangingTopologySelfTest.class);
return suite;
}