You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 10:02:11 UTC

ignite git commit: ignite-971 Fix retry for 'checkBackup' tx failure.

Repository: ignite
Updated Branches:
  refs/heads/ignite-971 cef56589e -> 1c57db9a5


ignite-971 Fix retry for 'checkBackup' tx failure.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c57db9a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c57db9a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c57db9a

Branch: refs/heads/ignite-971
Commit: 1c57db9a5f81f08c03ad10208a15c2cb352099d6
Parents: cef5658
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 10:49:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 11:01:14 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 21 ++++-----
 .../dht/GridDhtPartitionTopologyImpl.java       |  9 ++--
 .../near/GridNearOptimisticTxPrepareFuture.java |  3 ++
 .../near/GridNearTxFinishFuture.java            | 32 +++++++++++---
 .../cache/transactions/IgniteTxHandler.java     |  9 +++-
 .../IgniteCachePutRetryAbstractSelfTest.java    | 46 ++++++++++++++------
 .../dht/IgniteCachePutRetryAtomicSelfTest.java  |  2 +-
 ...gniteCachePutRetryTransactionalSelfTest.java | 16 +++++--
 8 files changed, 100 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9329e94..1fc94ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4096,21 +4096,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                     return t;
                 }
-                catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
-                    IgniteTxRollbackCheckedException e) {
+                catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException e) {
                     throw e;
                 }
                 catch (IgniteCheckedException e) {
-                    try {
-                        tx.rollback();
+                    if (!(e instanceof IgniteTxRollbackCheckedException)) {
+                        try {
+                            tx.rollback();
 
-                        e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
-                            tx.xid(), e);
-                    }
-                    catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
-                        U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
+                            e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
+                                tx.xid(), e);
+                        }
+                        catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
+                            U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
 
-                        U.addLastCause(e, e1, log);
+                            U.addLastCause(e, e1, log);
+                        }
                     }
 
                     if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5e183e9..fcb012f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -546,10 +546,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param updateSeq Update sequence.
      * @return Local partition.
      */
-    private GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) {
-        while (true) {
-            boolean belongs = cctx.affinity().localNode(p, topVer);
+    private GridDhtLocalPartition localPartition(int p,
+        AffinityTopologyVersion topVer,
+        boolean create,
+        boolean updateSeq) {
+        boolean belongs = create && cctx.affinity().localNode(p, topVer);
 
+        while (true) {
             GridDhtLocalPartition loc = locParts.get(p);
 
             if (loc != null && loc.state() == EVICTED) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 3f9decf..2048fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -234,6 +234,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
     /**
      * Completeness callback.
+     *
+     * @return {@code True} if future was finished by this call.
      */
     private boolean onComplete() {
         Throwable err0 = err.get();
@@ -457,6 +459,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
     /**
      * @param reads Read entries.
      * @param writes Write entries.
+     * @throws IgniteCheckedException If failed.
      */
     private void prepare(
         Iterable<IgniteTxEntry> reads,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 21aaef2..ab6dc3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -404,8 +404,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 if (backup == null) {
                     readyNearMappingFromBackup(mapping);
 
+                    ClusterTopologyCheckedException cause =
+                        new ClusterTopologyCheckedException("Backup node left grid: " + backupId);
+
+                    cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
                     mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
-                        "(backup has left grid): " + tx.xidVersion()));
+                        "(backup has left grid): " + tx.xidVersion(), cause));
                 }
                 else if (backup.isLocal()) {
                     boolean committed = cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion());
@@ -414,9 +419,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                     if (committed)
                         mini.onDone(tx);
-                    else
+                    else {
+                        ClusterTopologyCheckedException cause =
+                            new ClusterTopologyCheckedException("Primary node left grid: " + nodeId);
+
+                        cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
                         mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
-                            "(transaction has been rolled back on backup node): " + tx.xidVersion()));
+                            "(transaction has been rolled back on backup node): " + tx.xidVersion(), cause));
+                    }
                 }
                 else {
                     GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
@@ -731,8 +742,19 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
             readyNearMappingFromBackup(m);
 
-            if (res.checkCommittedError() != null)
-                onDone(res.checkCommittedError());
+            Throwable err = res.checkCommittedError();
+
+            if (err != null) {
+                if (err instanceof IgniteCheckedException) {
+                    ClusterTopologyCheckedException cause =
+                        ((IgniteCheckedException)err).getCause(ClusterTopologyCheckedException.class);
+
+                    if (cause != null)
+                        cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+                }
+
+                onDone(err);
+            }
             else
                 onDone(tx);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 9efa43a..756672a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1049,6 +1049,7 @@ public class IgniteTxHandler {
      *
      * @param nodeId Node id that originated finish request.
      * @param req Request.
+     * @param {@code True} if transaction committed on this node.
      */
     protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
         if (req.replyRequired()) {
@@ -1057,9 +1058,13 @@ public class IgniteTxHandler {
             if (req.checkCommitted()) {
                 res.checkCommitted(true);
 
-                if (!committed)
+                if (!committed) {
+                    ClusterTopologyCheckedException cause =
+                        new ClusterTopologyCheckedException("Primary node left grid.");
+
                     res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
-                        "(transaction has been rolled back on backup node): " + req.version()));
+                        "(transaction has been rolled back on backup node): " + req.version(), cause));
+                }
             }
 
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 966e75a..c20e231 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -81,10 +81,12 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
 
     /**
      * @param memMode Memory mode.
+     * @param store If {@code true} adds cache store.
      * @return Cache configuration.
      * @throws Exception If failed.
      */
-    protected CacheConfiguration cacheConfiguration(TestMemoryMode memMode) throws Exception {
+    @SuppressWarnings("unchecked")
+    protected CacheConfiguration cacheConfiguration(TestMemoryMode memMode, boolean store) throws Exception {
         CacheConfiguration cfg = new CacheConfiguration();
 
         cfg.setAtomicityMode(atomicityMode());
@@ -92,8 +94,11 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
         cfg.setAtomicWriteOrderMode(writeOrderMode());
         cfg.setBackups(1);
         cfg.setRebalanceMode(SYNC);
-        cfg.setCacheStoreFactory(new TestStoreFactory());
-        cfg.setWriteThrough(true);
+
+        if (store) {
+            cfg.setCacheStoreFactory(new TestStoreFactory());
+            cfg.setWriteThrough(true);
+        }
 
         GridTestUtils.setMemoryMode(null, cfg, memMode, 100, 1024);
 
@@ -154,58 +159,73 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
      * @throws Exception If failed.
      */
     public void testPut() throws Exception {
-        checkRetry(Test.PUT, TestMemoryMode.HEAP);
+        checkRetry(Test.PUT, TestMemoryMode.HEAP, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutStoreEnabled() throws Exception {
+        checkRetry(Test.PUT, TestMemoryMode.HEAP, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAll() throws Exception {
-        checkRetry(Test.PUT_ALL, TestMemoryMode.HEAP);
+        checkRetry(Test.PUT_ALL, TestMemoryMode.HEAP, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAsync() throws Exception {
-        checkRetry(Test.PUT_ASYNC, TestMemoryMode.HEAP);
+        checkRetry(Test.PUT_ASYNC, TestMemoryMode.HEAP, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAsyncStoreEnabled() throws Exception {
+        checkRetry(Test.PUT_ASYNC, TestMemoryMode.HEAP, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testInvoke() throws Exception {
-        checkRetry(Test.INVOKE, TestMemoryMode.HEAP);
+        checkRetry(Test.INVOKE, TestMemoryMode.HEAP, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testInvokeAll() throws Exception {
-        checkRetry(Test.INVOKE_ALL, TestMemoryMode.HEAP);
+        checkRetry(Test.INVOKE_ALL, TestMemoryMode.HEAP, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testInvokeAllOffheapSwap() throws Exception {
-        checkRetry(Test.INVOKE_ALL, TestMemoryMode.OFFHEAP_EVICT_SWAP);
+        checkRetry(Test.INVOKE_ALL, TestMemoryMode.OFFHEAP_EVICT_SWAP, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testInvokeAllOffheapTiered() throws Exception {
-        checkRetry(Test.INVOKE_ALL, TestMemoryMode.OFFHEAP_TIERED);
+        checkRetry(Test.INVOKE_ALL, TestMemoryMode.OFFHEAP_TIERED, false);
     }
 
     /**
      * @param test Test type.
      * @param memMode Memory mode.
+     * @param store If {@code true} uses cache with store.
      * @throws Exception If failed.
      */
-    private void checkRetry(Test test, TestMemoryMode memMode) throws Exception {
-        ignite(0).createCache(cacheConfiguration(memMode));
+    private void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
+        ignite(0).createCache(cacheConfiguration(memMode, store));
 
         final AtomicBoolean finished = new AtomicBoolean();
 
@@ -393,7 +413,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
      * @throws Exception If failed.
      */
     private void checkFailsWithNoRetries(boolean async) throws Exception {
-        ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP));
+        ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false));
 
         final AtomicBoolean finished = new AtomicBoolean();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
index 17d0af1..3d7c7d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
@@ -47,7 +47,7 @@ public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstra
      * @throws Exception If failed.
      */
     public void testPutInsideTransaction() throws Exception {
-        ignite(0).createCache(cacheConfiguration(GridTestUtils.TestMemoryMode.HEAP));
+        ignite(0).createCache(cacheConfiguration(GridTestUtils.TestMemoryMode.HEAP, false));
 
         CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index fecb16b..f61faf2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -92,23 +92,31 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
      * @throws Exception If failed.
      */
     public void testExplicitTransactionRetries() throws Exception {
-        explicitTransactionRetries(TestMemoryMode.HEAP);
+        explicitTransactionRetries(TestMemoryMode.HEAP, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExplicitTransactionRetriesStoreEnabled() throws Exception {
+        explicitTransactionRetries(TestMemoryMode.HEAP, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testExplicitTransactionRetriesOffheapSwap() throws Exception {
-        explicitTransactionRetries(TestMemoryMode.OFFHEAP_EVICT_SWAP);
+        explicitTransactionRetries(TestMemoryMode.OFFHEAP_EVICT_SWAP, false);
     }
 
     /**
      * @param memMode Memory mode.
+     * @param store If {@code true} uses cache with store.
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    public void explicitTransactionRetries(TestMemoryMode memMode) throws Exception {
-        ignite(0).createCache(cacheConfiguration(memMode));
+    public void explicitTransactionRetries(TestMemoryMode memMode, boolean store) throws Exception {
+        ignite(0).createCache(cacheConfiguration(memMode, store));
 
         final AtomicInteger idx = new AtomicInteger();
         int threads = 8;