You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/08 18:23:08 UTC
[12/50] [abbrv] ignite git commit: ignite-1607 Fixes for serializable
txs on changing topology
ignite-1607 Fixes for serializable txs on changing topology
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11d177f5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11d177f5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11d177f5
Branch: refs/heads/ignite-1702
Commit: 11d177f5d5d2c2066c2d1fa6e28c9b1a4052d6c6
Parents: 6ea3b56
Author: sboikov <sb...@gridgain.com>
Authored: Mon Nov 2 09:02:29 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 2 09:02:29 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 44 +--
...arOptimisticSerializableTxPrepareFuture.java | 12 +-
.../cache/transactions/IgniteTxManager.java | 4 +-
.../CacheSerializableTransactionsTest.java | 313 ++++++++++++-------
4 files changed, 240 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d806801..61975d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -573,27 +573,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.onePhaseCommit() && tx.commitOnPrepare()) {
assert last;
+ Throwable prepErr = this.err.get();
+
// Must create prepare response before transaction is committed to grab correct return value.
- final GridNearTxPrepareResponse res = createPrepareResponse();
+ final GridNearTxPrepareResponse res = createPrepareResponse(prepErr);
onComplete(res);
if (tx.commitOnPrepare()) {
if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
- IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ?
- tx.commitAsync() : tx.rollbackAsync();
+ IgniteInternalFuture<IgniteInternalTx> fut = null;
- fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
- try {
- if (replied.compareAndSet(false, true))
- sendPrepareResponse(res);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+ if (prepErr == null)
+ fut = tx.commitAsync();
+ else if (!cctx.kernalContext().isStopping())
+ fut = tx.rollbackAsync();
+
+ if (fut != null) {
+ fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
+ try {
+ if (replied.compareAndSet(false, true))
+ sendPrepareResponse(res);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+ }
}
- }
- });
+ });
+ }
}
}
else {
@@ -610,7 +618,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
else {
if (replied.compareAndSet(false, true)) {
- GridNearTxPrepareResponse res = createPrepareResponse();
+ GridNearTxPrepareResponse res = createPrepareResponse(this.err.get());
try {
sendPrepareResponse(res);
@@ -659,12 +667,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
/**
+ * @param prepErr Error.
* @return Prepare response.
*/
- private GridNearTxPrepareResponse createPrepareResponse() {
- // Send reply back to originating near node.
- Throwable prepErr = err.get();
-
+ private GridNearTxPrepareResponse createPrepareResponse(@Nullable Throwable prepErr) {
assert F.isEmpty(tx.invalidPartitions());
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
@@ -981,7 +987,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (err0 != null) {
err.compareAndSet(null, err0);
- final GridNearTxPrepareResponse res = createPrepareResponse();
+ final GridNearTxPrepareResponse res = createPrepareResponse(err.get());
tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
@Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 47c1d21..5488bb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -809,18 +809,22 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
remap(res);
}
catch (IgniteCheckedException e) {
+ err.compareAndSet(null, e);
+
onDone(e);
}
}
});
}
else {
- ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+ ClusterTopologyCheckedException err0 = new ClusterTopologyCheckedException(
"Cluster topology changed while client transaction is preparing.");
- err.retryReadyFuture(affFut);
+ err0.retryReadyFuture(affFut);
+
+ err.compareAndSet(null, err0);
- onDone(err);
+ onDone(err0);
}
}
catch (IgniteCheckedException e) {
@@ -829,6 +833,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
GridNearOptimisticSerializableTxPrepareFuture.this);
}
+ err.compareAndSet(null, e);
+
onDone(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c1e9202..1f51b8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1617,8 +1617,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
{
for (final IgniteInternalTx tx : txs()) {
if (nearVer.equals(tx.nearXidVersion())) {
- TransactionState state = tx.state();
-
IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null && !prepFut.isDone()) {
@@ -1648,6 +1646,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return fut0;
}
+ TransactionState state = tx.state();
+
if (state == PREPARED || state == COMMITTING || state == COMMITTED) {
if (--txNum == 0) {
if (fut != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 8c135ad..ae64bb4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
@@ -60,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -115,6 +117,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
cfg.setClientMode(client);
cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
@@ -187,7 +191,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
txStreamerLoad(ignite(SRVS), 10_000, cache.getName(), allowOverwrite);
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -290,7 +294,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -348,7 +352,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -414,7 +418,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -495,7 +499,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -562,7 +566,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -609,7 +613,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
checkValue(key, null, cache.getName());
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -640,7 +644,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -718,7 +722,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -821,7 +825,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -914,7 +918,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -1007,7 +1011,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -1101,7 +1105,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -1187,7 +1191,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -1273,7 +1277,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -1398,7 +1402,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -1523,7 +1527,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -1660,7 +1664,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -1797,7 +1801,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -1968,7 +1972,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -2118,7 +2122,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
checkValue(i, rmv ? null : i, cache.getName());
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -2208,7 +2212,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -2267,7 +2271,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -2340,7 +2344,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
checkValue(key2, 2, cache.getName());
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -2389,7 +2393,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- ignite0.destroyCache(cacheName);
+ destroyCache(cacheName);
}
}
@@ -2475,7 +2479,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
checkValue(key3, key3, cacheName);
}
finally {
- ignite0.destroyCache(cacheName);
+ destroyCache(cacheName);
}
}
@@ -2555,7 +2559,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
checkValue(key3, key3, cacheName);
}
finally {
- ignite0.destroyCache(cacheName);
+ destroyCache(cacheName);
}
}
@@ -2817,8 +2821,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- ignite0.destroyCache(CACHE1);
- ignite0.destroyCache(CACHE2);
+ destroyCache(CACHE1);
+ destroyCache(CACHE2);
}
}
@@ -2876,7 +2880,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite0, ccfg.getName());
+ destroyCache(ccfg.getName());
}
}
}
@@ -2975,30 +2979,118 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
caches.add(client.<Integer, Integer>cache(cacheName));
}
- IgniteInternalFuture<?> restartFut = null;
+ IgniteInternalFuture<?> restartFut = restart ? restartFuture(stop, null) : null;
+
+ for (int i = 0; i < 30; i++) {
+ final AtomicInteger cntr = new AtomicInteger();
+
+ final Integer key = i;
+
+ final AtomicInteger threadIdx = new AtomicInteger();
+
+ final int THREADS = 10;
+
+ final CyclicBarrier barrier = new CyclicBarrier(THREADS);
+
+ GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx = threadIdx.getAndIncrement() % caches.size();
+
+ IgniteCache<Integer, Integer> cache = caches.get(idx);
+
+ Ignite ignite = cache.unwrap(Ignite.class);
- if (restart) {
- restartFut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- while (!stop.get()) {
- stopGrid(0);
+ IgniteTransactions txs = ignite.transactions();
- U.sleep(300);
+ log.info("Started update thread: " + ignite.name());
+
+ barrier.await();
- Ignite ignite = startGrid(0);
+ for (int i = 0; i < 1000; i++) {
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
- assertFalse(ignite.configuration().isClientMode());
+ cache.put(key, val == null ? 1 : val + 1);
+
+ tx.commit();
+ }
+
+ cntr.incrementAndGet();
+ }
+ catch (TransactionOptimisticException ignore) {
+ // Retry.
+ }
+ catch (IgniteException | CacheException e) {
+ assertTrue("Unexpected exception [err=" + e + ", cause=" + e.getCause() + ']',
+ restart && X.hasCause(e, ClusterTopologyCheckedException.class));
+ }
}
return null;
}
- });
+ }, THREADS, "update-thread").get();
+
+ log.info("Iteration [iter=" + i + ", val=" + cntr.get() + ']');
+
+ assertTrue(cntr.get() > 0);
+
+ checkValue(key, cntr.get(), cacheName, restart);
}
- for (int i = 0; i < 30; i++) {
+ stop.set(true);
+
+ if (restartFut != null)
+ restartFut.get();
+ }
+ finally {
+ stop.set(true);
+
+ destroyCache(cacheName);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIncrementTxMultipleNodeRestart() throws Exception {
+ incrementTxMultiple(false, false, true);
+ }
+
+ /**
+ * @param nearCache If {@code true} near cache is enabled.
+ * @param store If {@code true} cache store is enabled.
+ * @param restart If {@code true} restarts one node.
+ * @throws Exception If failed.
+ */
+ private void incrementTxMultiple(boolean nearCache, boolean store, final boolean restart) throws Exception {
+ final Ignite srv = ignite(1);
+
+ CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, store, false);
+
+ final List<Ignite> clients = clients();
+
+ final String cacheName = srv.createCache(ccfg).getName();
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ try {
+ final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
+
+ for (Ignite client : clients) {
+ if (nearCache)
+ caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>()));
+ else
+ caches.add(client.<Integer, Integer>cache(cacheName));
+ }
+
+ IgniteInternalFuture<?> restartFut = restart ? restartFuture(stop, null) : null;
+
+ for (int i = 0; i < 20; i += 2) {
final AtomicInteger cntr = new AtomicInteger();
- final Integer key = i;
+ final Integer key1 = i;
+ final Integer key2 = i + 1;
final AtomicInteger threadIdx = new AtomicInteger();
@@ -3006,6 +3098,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
+ final ConcurrentSkipListSet<Integer> vals1 = new ConcurrentSkipListSet<>();
+ final ConcurrentSkipListSet<Integer> vals2 = new ConcurrentSkipListSet<>();
+
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int idx = threadIdx.getAndIncrement() % caches.size();
@@ -3023,11 +3118,19 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
for (int i = 0; i < 1000; i++) {
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Integer val = cache.get(key);
+ Integer val1 = cache.get(key1);
+ Integer val2 = cache.get(key2);
- cache.put(key, val == null ? 1 : val + 1);
+ Integer newVal1 = val1 == null ? 1 : val1 + 1;
+ Integer newVal2 = val2 == null ? 1 : val2 + 1;
+
+ cache.put(key1, newVal1);
+ cache.put(key2, newVal2);
tx.commit();
+
+ assertTrue(vals1.add(newVal1));
+ assertTrue(vals2.add(newVal2));
}
cntr.incrementAndGet();
@@ -3049,7 +3152,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
assertTrue(cntr.get() > 0);
- checkValue(key, cntr.get(), cacheName, restart);
+ checkValue(key1, cntr.get(), cacheName, restart);
+ checkValue(key2, cntr.get(), cacheName, restart);
}
stop.set(true);
@@ -3060,7 +3164,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
finally {
stop.set(true);
- destroyCache(srv, cacheName);
+ destroyCache(cacheName);
}
}
@@ -3189,7 +3293,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- ignite0.destroyCache(cacheName);
+ destroyCache(cacheName);
}
}
@@ -3229,6 +3333,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAccountTxNodeRestart() throws Exception {
+ accountTx(false, false, false, true, TestMemoryMode.HEAP);
+ }
+
+ /**
* @param getAll If {@code true} uses getAll/putAll in transaction.
* @param nearCache If {@code true} near cache is enabled.
* @param nonSer If {@code true} starts threads executing non-serializable transactions.
@@ -3421,25 +3532,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}, THREADS, "tx-thread");
- IgniteInternalFuture<?> restartFut = null;
-
- if (restart) {
- restartFut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- while (!fut.isDone()) {
- stopGrid(0);
-
- U.sleep(300);
-
- Ignite ignite = startGrid(0);
-
- assertFalse(ignite.configuration().isClientMode());
- }
-
- return null;
- }
- });
- }
+ IgniteInternalFuture<?> restartFut = restart ? restartFuture(null, fut) : null;
fut.get(testTime + 30_000);
@@ -3506,7 +3599,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- srv.destroyCache(cacheName);
+ destroyCache(cacheName);
}
}
@@ -3558,21 +3651,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
cacheNames.add(ccfg.getName());
}
- IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- while (!finished.get()) {
- stopGrid(0);
-
- U.sleep(300);
-
- Ignite ignite = startGrid(0);
-
- assertFalse(ignite.configuration().isClientMode());
- }
-
- return null;
- }
- });
+ IgniteInternalFuture<?> restartFut = restartFuture(finished, null);
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
@@ -3653,7 +3732,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
finished.set(true);
for (String cacheName : cacheNames)
- srv.destroyCache(cacheName);
+ destroyCache(cacheName);
}
}
@@ -3710,7 +3789,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- ignite.destroyCache(cacheName);
+ destroyCache(cacheName);
}
}
@@ -3785,27 +3864,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
final AtomicBoolean finished = new AtomicBoolean();
- IgniteInternalFuture<Object> fut = null;
+ IgniteInternalFuture<?> fut = restart ? restartFuture(finished, null) : null;
try {
- if (restart) {
- fut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- while (!finished.get()) {
- stopGrid(0);
-
- U.sleep(300);
-
- Ignite ignite = startGrid(0);
-
- assertFalse(ignite.configuration().isClientMode());
- }
-
- return null;
- }
- });
- }
-
for (int i = 0; i < 10; i++) {
log.info("Iteration: " + i);
@@ -3957,7 +4018,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(srv, cacheName);
+ destroyCache(cacheName);
}
}
@@ -4152,16 +4213,20 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
/**
- * @param ignite Node.
* @param cacheName Cache name.
*/
- private void destroyCache(Ignite ignite, String cacheName) {
+ private void destroyCache(String cacheName) {
storeMap.clear();
- ignite.destroyCache(cacheName);
+ for (Ignite ignite : G.allGrids()) {
+ try {
+ ignite.destroyCache(cacheName);
+ }
+ catch (IgniteException ignore) {
+ // No-op.
+ }
- for (Ignite ignite0 : G.allGrids()) {
- GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite0.configuration().getSwapSpaceSpi();
+ GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite.configuration().getSwapSpaceSpi();
spi.clearAll();
}
@@ -4220,6 +4285,36 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
/**
+ * @param stop Stop flag.
+ * @param fut Future.
+ * @return Restart thread future.
+ */
+ private IgniteInternalFuture<?> restartFuture(final AtomicBoolean stop, final IgniteInternalFuture<?> fut) {
+ return GridTestUtils.runAsync(new Callable<Object>() {
+ private boolean stop() {
+ if (stop != null)
+ return stop.get();
+
+ return fut.isDone();
+ }
+
+ @Override public Object call() throws Exception {
+ while (!stop()) {
+ Ignite ignite = startGrid(SRVS + CLIENTS);
+
+ assertFalse(ignite.configuration().isClientMode());
+
+ U.sleep(300);
+
+ stopGrid(SRVS + CLIENTS);
+ }
+
+ return null;
+ }
+ }, "restart-thread");
+ }
+
+ /**
*
*/
private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> {