You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/30 10:52:49 UTC
ignite git commit: ignite-1607 restart test
Repository: ignite
Updated Branches:
refs/heads/ignite-1607 [created] 15432ae94
ignite-1607 restart test
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15432ae9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15432ae9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15432ae9
Branch: refs/heads/ignite-1607
Commit: 15432ae9473779b5b054fe41e94f919698c836fd
Parents: 303def3
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 29 12:54:07 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 30 12:18:57 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 44 +++---
...arOptimisticSerializableTxPrepareFuture.java | 12 +-
.../cache/transactions/IgniteTxManager.java | 4 +-
.../CacheSerializableTransactionsTest.java | 151 +++++++++++++++++++
4 files changed, 187 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d806801..61975d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -573,27 +573,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.onePhaseCommit() && tx.commitOnPrepare()) {
assert last;
+ Throwable prepErr = this.err.get();
+
// Must create prepare response before transaction is committed to grab correct return value.
- final GridNearTxPrepareResponse res = createPrepareResponse();
+ final GridNearTxPrepareResponse res = createPrepareResponse(prepErr);
onComplete(res);
if (tx.commitOnPrepare()) {
if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
- IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ?
- tx.commitAsync() : tx.rollbackAsync();
+ IgniteInternalFuture<IgniteInternalTx> fut = null;
- fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
- try {
- if (replied.compareAndSet(false, true))
- sendPrepareResponse(res);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+ if (prepErr == null)
+ fut = tx.commitAsync();
+ else if (!cctx.kernalContext().isStopping())
+ fut = tx.rollbackAsync();
+
+ if (fut != null) {
+ fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
+ try {
+ if (replied.compareAndSet(false, true))
+ sendPrepareResponse(res);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+ }
}
- }
- });
+ });
+ }
}
}
else {
@@ -610,7 +618,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
else {
if (replied.compareAndSet(false, true)) {
- GridNearTxPrepareResponse res = createPrepareResponse();
+ GridNearTxPrepareResponse res = createPrepareResponse(this.err.get());
try {
sendPrepareResponse(res);
@@ -659,12 +667,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
/**
+ * @param prepErr Error.
* @return Prepare response.
*/
- private GridNearTxPrepareResponse createPrepareResponse() {
- // Send reply back to originating near node.
- Throwable prepErr = err.get();
-
+ private GridNearTxPrepareResponse createPrepareResponse(@Nullable Throwable prepErr) {
assert F.isEmpty(tx.invalidPartitions());
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
@@ -981,7 +987,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (err0 != null) {
err.compareAndSet(null, err0);
- final GridNearTxPrepareResponse res = createPrepareResponse();
+ final GridNearTxPrepareResponse res = createPrepareResponse(err.get());
tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
@Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 47c1d21..5488bb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -809,18 +809,22 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
remap(res);
}
catch (IgniteCheckedException e) {
+ err.compareAndSet(null, e);
+
onDone(e);
}
}
});
}
else {
- ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+ ClusterTopologyCheckedException err0 = new ClusterTopologyCheckedException(
"Cluster topology changed while client transaction is preparing.");
- err.retryReadyFuture(affFut);
+ err0.retryReadyFuture(affFut);
+
+ err.compareAndSet(null, err0);
- onDone(err);
+ onDone(err0);
}
}
catch (IgniteCheckedException e) {
@@ -829,6 +833,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
GridNearOptimisticSerializableTxPrepareFuture.this);
}
+ err.compareAndSet(null, e);
+
onDone(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c1e9202..1f51b8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1617,8 +1617,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
{
for (final IgniteInternalTx tx : txs()) {
if (nearVer.equals(tx.nearXidVersion())) {
- TransactionState state = tx.state();
-
IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null && !prepFut.isDone()) {
@@ -1648,6 +1646,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return fut0;
}
+ TransactionState state = tx.state();
+
if (state == PREPARED || state == COMMITTING || state == COMMITTED) {
if (--txNum == 0) {
if (fut != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 8c135ad..7d37b24 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
@@ -107,6 +108,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/** */
private boolean client;
+ /** */
+ private static int cacheId;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -126,6 +130,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
+ cacheId = 0;
+
startGridsMultiThreaded(SRVS);
client = true;
@@ -3067,6 +3073,142 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testIncrementTxMultipleNodeRestart() throws Exception {
+ incrementTxMultiple(false, false, true);
+ }
+
+ /**
+ * @param nearCache If {@code true} near cache is enabled.
+ * @param store If {@code true} cache store is enabled.
+ * @param restart If {@code true} restarts one node.
+ * @throws Exception If failed.
+ */
+ private void incrementTxMultiple(boolean nearCache, boolean store, final boolean restart) throws Exception {
+ final Ignite srv = ignite(1);
+
+ CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, store, false);
+
+ final List<Ignite> clients = clients();
+
+ final String cacheName = srv.createCache(ccfg).getName();
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ try {
+ final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
+
+ for (Ignite client : clients) {
+ if (nearCache)
+ caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>()));
+ else
+ caches.add(client.<Integer, Integer>cache(cacheName));
+ }
+
+ IgniteInternalFuture<?> restartFut = null;
+
+ if (restart) {
+ restartFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (!stop.get()) {
+ stopGrid(0);
+
+ U.sleep(300);
+
+ Ignite ignite = startGrid(0);
+
+ assertFalse(ignite.configuration().isClientMode());
+ }
+
+ return null;
+ }
+ });
+ }
+
+ for (int i = 0; i < 30; i += 2) {
+ final AtomicInteger cntr = new AtomicInteger();
+
+ final Integer key1 = i;
+ final Integer key2 = i + 1;
+
+ final AtomicInteger threadIdx = new AtomicInteger();
+
+ final int THREADS = 10;
+
+ final CyclicBarrier barrier = new CyclicBarrier(THREADS);
+
+ final ConcurrentSkipListSet<Integer> vals1 = new ConcurrentSkipListSet<>();
+ final ConcurrentSkipListSet<Integer> vals2 = new ConcurrentSkipListSet<>();
+
+ GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx = threadIdx.getAndIncrement() % caches.size();
+
+ IgniteCache<Integer, Integer> cache = caches.get(idx);
+
+ Ignite ignite = cache.unwrap(Ignite.class);
+
+ IgniteTransactions txs = ignite.transactions();
+
+ log.info("Started update thread: " + ignite.name());
+
+ barrier.await();
+
+ for (int i = 0; i < 1000; i++) {
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val1 = cache.get(key1);
+ Integer val2 = cache.get(key2);
+
+ Integer newVal1 = val1 == null ? 1 : val1 + 1;
+ Integer newVal2 = val2 == null ? 1 : val2 + 1;
+
+ cache.put(key1, newVal1);
+ cache.put(key2, newVal2);
+
+ tx.commit();
+
+ assertTrue(vals1.add(newVal1));
+ assertTrue(vals2.add(newVal2));
+ }
+
+ cntr.incrementAndGet();
+ }
+ catch (TransactionOptimisticException ignore) {
+ // Retry.
+ }
+ catch (IgniteException | CacheException e) {
+ assertTrue("Unexpected exception [err=" + e + ", cause=" + e.getCause() + ']',
+ restart && X.hasCause(e, ClusterTopologyCheckedException.class));
+ }
+ }
+
+ return null;
+ }
+ }, THREADS, "update-thread").get();
+
+ log.info("Iteration [iter=" + i + ", val=" + cntr.get() + ']');
+
+ assertTrue(cntr.get() > 0);
+
+ checkValue(key1, cntr.get(), cacheName, restart);
+ checkValue(key2, cntr.get(), cacheName, restart);
+ }
+
+ stop.set(true);
+
+ if (restartFut != null)
+ restartFut.get();
+ }
+ finally {
+ stop.set(true);
+
+ destroyCache(srv, cacheName);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testGetRemoveTx() throws Exception {
getRemoveTx(false, false);
}
@@ -3229,6 +3371,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAccountTxNodeRestart() throws Exception {
+ accountTx(false, false, false, true, TestMemoryMode.HEAP);
+ }
+
+ /**
* @param getAll If {@code true} uses getAll/putAll in transaction.
* @param nearCache If {@code true} near cache is enabled.
* @param nonSer If {@code true} starts threads executing non-serializable transactions.
@@ -4183,6 +4332,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
boolean nearCache) {
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+ ccfg.setName("testCache-" + cacheId++);
+
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setWriteSynchronizationMode(syncMode);