You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/30 10:52:49 UTC

ignite git commit: ignite-1607 restart test

Repository: ignite
Updated Branches:
  refs/heads/ignite-1607 [created] 15432ae94


ignite-1607 restart test


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15432ae9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15432ae9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15432ae9

Branch: refs/heads/ignite-1607
Commit: 15432ae9473779b5b054fe41e94f919698c836fd
Parents: 303def3
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 29 12:54:07 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 30 12:18:57 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java |  44 +++---
 ...arOptimisticSerializableTxPrepareFuture.java |  12 +-
 .../cache/transactions/IgniteTxManager.java     |   4 +-
 .../CacheSerializableTransactionsTest.java      | 151 +++++++++++++++++++
 4 files changed, 187 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d806801..61975d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -573,27 +573,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         if (tx.onePhaseCommit() && tx.commitOnPrepare()) {
             assert last;
 
+            Throwable prepErr = this.err.get();
+
             // Must create prepare response before transaction is committed to grab correct return value.
-            final GridNearTxPrepareResponse res = createPrepareResponse();
+            final GridNearTxPrepareResponse res = createPrepareResponse(prepErr);
 
             onComplete(res);
 
             if (tx.commitOnPrepare()) {
                 if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
-                    IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ?
-                        tx.commitAsync() : tx.rollbackAsync();
+                    IgniteInternalFuture<IgniteInternalTx> fut = null;
 
-                    fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
-                        @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
-                            try {
-                                if (replied.compareAndSet(false, true))
-                                    sendPrepareResponse(res);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+                    if (prepErr == null)
+                        fut = tx.commitAsync();
+                    else if (!cctx.kernalContext().isStopping())
+                        fut = tx.rollbackAsync();
+
+                    if (fut != null) {
+                        fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
+                            @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
+                                try {
+                                    if (replied.compareAndSet(false, true))
+                                        sendPrepareResponse(res);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+                                }
                             }
-                        }
-                    });
+                        });
+                    }
                 }
             }
             else {
@@ -610,7 +618,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         }
         else {
             if (replied.compareAndSet(false, true)) {
-                GridNearTxPrepareResponse res = createPrepareResponse();
+                GridNearTxPrepareResponse res = createPrepareResponse(this.err.get());
 
                 try {
                     sendPrepareResponse(res);
@@ -659,12 +667,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /**
+     * @param prepErr Error.
      * @return Prepare response.
      */
-    private GridNearTxPrepareResponse createPrepareResponse() {
-        // Send reply back to originating near node.
-        Throwable prepErr = err.get();
-
+    private GridNearTxPrepareResponse createPrepareResponse(@Nullable Throwable prepErr) {
         assert F.isEmpty(tx.invalidPartitions());
 
         GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
@@ -981,7 +987,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 if (err0 != null) {
                     err.compareAndSet(null, err0);
 
-                    final GridNearTxPrepareResponse res = createPrepareResponse();
+                    final GridNearTxPrepareResponse res = createPrepareResponse(err.get());
 
                     tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
                         @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 47c1d21..5488bb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -809,18 +809,22 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                                                         remap(res);
                                                     }
                                                     catch (IgniteCheckedException e) {
+                                                        err.compareAndSet(null, e);
+
                                                         onDone(e);
                                                     }
                                                 }
                                             });
                                         }
                                         else {
-                                            ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+                                            ClusterTopologyCheckedException err0 = new ClusterTopologyCheckedException(
                                                 "Cluster topology changed while client transaction is preparing.");
 
-                                            err.retryReadyFuture(affFut);
+                                            err0.retryReadyFuture(affFut);
+
+                                            err.compareAndSet(null, err0);
 
-                                            onDone(err);
+                                            onDone(err0);
                                         }
                                     }
                                     catch (IgniteCheckedException e) {
@@ -829,6 +833,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                                                 GridNearOptimisticSerializableTxPrepareFuture.this);
                                         }
 
+                                        err.compareAndSet(null, e);
+
                                         onDone(e);
                                     }
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c1e9202..1f51b8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1617,8 +1617,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     {
         for (final IgniteInternalTx tx : txs()) {
             if (nearVer.equals(tx.nearXidVersion())) {
-                TransactionState state = tx.state();
-
                 IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
 
                 if (prepFut != null && !prepFut.isDone()) {
@@ -1648,6 +1646,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     return fut0;
                 }
 
+                TransactionState state = tx.state();
+
                 if (state == PREPARED || state == COMMITTING || state == COMMITTED) {
                     if (--txNum == 0) {
                         if (fut != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 8c135ad..7d37b24 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
@@ -107,6 +108,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     /** */
     private boolean client;
 
+    /** */
+    private static int cacheId;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -126,6 +130,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        cacheId = 0;
+
         startGridsMultiThreaded(SRVS);
 
         client = true;
@@ -3067,6 +3073,142 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testIncrementTxMultipleNodeRestart() throws Exception {
+        incrementTxMultiple(false, false, true);
+    }
+
+    /**
+     * @param nearCache If {@code true} near cache is enabled.
+     * @param store If {@code true} cache store is enabled.
+     * @param restart If {@code true} restarts one node.
+     * @throws Exception If failed.
+     */
+    private void incrementTxMultiple(boolean nearCache, boolean store, final boolean restart) throws Exception {
+        final Ignite srv = ignite(1);
+
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, store, false);
+
+        final List<Ignite> clients = clients();
+
+        final String cacheName = srv.createCache(ccfg).getName();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        try {
+            final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
+
+            for (Ignite client : clients) {
+                if (nearCache)
+                    caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>()));
+                else
+                    caches.add(client.<Integer, Integer>cache(cacheName));
+            }
+
+            IgniteInternalFuture<?> restartFut = null;
+
+            if (restart) {
+                restartFut = GridTestUtils.runAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        while (!stop.get()) {
+                            stopGrid(0);
+
+                            U.sleep(300);
+
+                            Ignite ignite = startGrid(0);
+
+                            assertFalse(ignite.configuration().isClientMode());
+                        }
+
+                        return null;
+                    }
+                });
+            }
+
+            for (int i = 0; i < 30; i += 2) {
+                final AtomicInteger cntr = new AtomicInteger();
+
+                final Integer key1 = i;
+                final Integer key2 = i + 1;
+
+                final AtomicInteger threadIdx = new AtomicInteger();
+
+                final int THREADS = 10;
+
+                final CyclicBarrier barrier = new CyclicBarrier(THREADS);
+
+                final ConcurrentSkipListSet<Integer> vals1 = new ConcurrentSkipListSet<>();
+                final ConcurrentSkipListSet<Integer> vals2 = new ConcurrentSkipListSet<>();
+
+                GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        int idx = threadIdx.getAndIncrement() % caches.size();
+
+                        IgniteCache<Integer, Integer> cache = caches.get(idx);
+
+                        Ignite ignite = cache.unwrap(Ignite.class);
+
+                        IgniteTransactions txs = ignite.transactions();
+
+                        log.info("Started update thread: " + ignite.name());
+
+                        barrier.await();
+
+                        for (int i = 0; i < 1000; i++) {
+                            try {
+                                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                                    Integer val1 = cache.get(key1);
+                                    Integer val2 = cache.get(key2);
+
+                                    Integer newVal1 = val1 == null ? 1 : val1 + 1;
+                                    Integer newVal2 = val2 == null ? 1 : val2 + 1;
+
+                                    cache.put(key1, newVal1);
+                                    cache.put(key2, newVal2);
+
+                                    tx.commit();
+
+                                    assertTrue(vals1.add(newVal1));
+                                    assertTrue(vals2.add(newVal2));
+                                }
+
+                                cntr.incrementAndGet();
+                            }
+                            catch (TransactionOptimisticException ignore) {
+                                // Retry.
+                            }
+                            catch (IgniteException | CacheException e) {
+                                assertTrue("Unexpected exception [err=" + e + ", cause=" + e.getCause() + ']',
+                                    restart && X.hasCause(e, ClusterTopologyCheckedException.class));
+                            }
+                        }
+
+                        return null;
+                    }
+                }, THREADS, "update-thread").get();
+
+                log.info("Iteration [iter=" + i + ", val=" + cntr.get() + ']');
+
+                assertTrue(cntr.get() > 0);
+
+                checkValue(key1, cntr.get(), cacheName, restart);
+                checkValue(key2, cntr.get(), cacheName, restart);
+            }
+
+            stop.set(true);
+
+            if (restartFut != null)
+                restartFut.get();
+        }
+        finally {
+            stop.set(true);
+
+            destroyCache(srv, cacheName);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testGetRemoveTx() throws Exception {
         getRemoveTx(false, false);
     }
@@ -3229,6 +3371,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testAccountTxNodeRestart() throws Exception {
+        accountTx(false, false, false, true, TestMemoryMode.HEAP);
+    }
+
+    /**
      * @param getAll If {@code true} uses getAll/putAll in transaction.
      * @param nearCache If {@code true} near cache is enabled.
      * @param nonSer If {@code true} starts threads executing non-serializable transactions.
@@ -4183,6 +4332,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
         boolean nearCache) {
         CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
 
+        ccfg.setName("testCache-" + cacheId++);
+
         ccfg.setCacheMode(cacheMode);
         ccfg.setAtomicityMode(TRANSACTIONAL);
         ccfg.setWriteSynchronizationMode(syncMode);