You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/06 11:45:13 UTC

[08/14] incubator-ignite git commit: # ignite-157-2

# ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/65099366
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/65099366
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/65099366

Branch: refs/heads/ignite-sprint-4
Commit: 65099366d767073de9a1e7a5d1b5ed67b4306fe8
Parents: fbf7149
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 10:35:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 13:50:49 2015 +0300

----------------------------------------------------------------------
 ...ridCacheOptimisticCheckPreparedTxFuture.java |  14 +-
 .../cache/transactions/IgniteTxManager.java     |  50 +++----
 ...ePrimaryNodeFailureRecoveryAbstractTest.java | 147 ++++++++++++++-----
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 -
 4 files changed, 150 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 3e345f4..bd3e1cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -163,6 +163,9 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 try {
                     cctx.io().send(nearNodeId, req, tx.ioPolicy());
                 }
+                catch (ClusterTopologyCheckedException e) {
+                    fut.onNodeLeft();
+                }
                 catch (IgniteCheckedException e) {
                     fut.onError(e);
                 }
@@ -398,7 +401,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 cctx.tm().finishOptimisticTxOnRecovery(tx, res);
             }
             else {
-                if (nearTxCheck) {
+                if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
                     if (log.isDebugEnabled())
                         log.debug("Failed to check transaction on near node, " +
                             "ignoring [err=" + err + ", tx=" + tx + ']');
@@ -480,7 +483,14 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
             if (log.isDebugEnabled())
                 log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
 
-            onDone(true);
+            if (nearTxCheck) {
+                // Near and originating nodes left, need initiate tx check.
+                cctx.tm().commitIfPrepared(tx);
+
+                onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+            }
+            else
+                onDone(true);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 19efc5d..85b3ad0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2053,6 +2053,31 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Commits optimistic transaction in case when node started transaction failed, but all related
+     * transactions were prepared (invalidates transaction if it is not fully prepared).
+     *
+     * @param tx Transaction.
+     */
+    public void commitIfPrepared(IgniteInternalTx tx) {
+        assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote  : tx;
+        assert !F.isEmpty(tx.transactionNodes()) : tx;
+        assert tx.nearXidVersion() != null : tx;
+
+        GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
+            cctx,
+            tx,
+            tx.originatingNodeId(),
+            tx.transactionNodes());
+
+        cctx.mvcc().addFuture(fut);
+
+        if (log.isDebugEnabled())
+            log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
+
+        fut.prepare();
+    }
+
+    /**
      * Timeout object for node failure handler.
      */
     private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
@@ -2122,31 +2147,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 cctx.kernalContext().gateway().readUnlock();
             }
         }
-
-        /**
-         * Commits optimistic transaction in case when node started transaction failed, but all related
-         * transactions were prepared (invalidates transaction if it is not fully prepared).
-         *
-         * @param tx Transaction.
-         */
-        private void commitIfPrepared(IgniteInternalTx tx) {
-            assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote  : tx;
-            assert !F.isEmpty(tx.transactionNodes()) : tx;
-            assert tx.nearXidVersion() != null : tx;
-
-            GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
-                cctx,
-                tx,
-                evtNodeId,
-                tx.transactionNodes());
-
-            cctx.mvcc().addFuture(fut);
-
-            if (log.isDebugEnabled())
-                log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
-
-            fut.prepare();
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index 7a393d8..ee2f16b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -149,7 +149,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
     }
 
     /**
-     * @param locBackupKey If {@code true} uses recovery for local backup key.
+     * @param locBackupKey If {@code true} uses one key which is backup for originating node.
      * @param rollback If {@code true} tests rollback after primary node failure.
      * @param optimistic If {@code true} tests optimistic transaction.
      * @throws Exception If failed.
@@ -177,6 +177,9 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         final Integer key1 = key0;
         final Integer key2 = primaryKey(cache2);
 
+        final Collection<ClusterNode> key1Nodes = aff.mapKeyToPrimaryAndBackups(key1);
+        final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
+
         TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
 
         IgniteTransactions txs = ignite(0).transactions();
@@ -225,8 +228,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 try {
-                    checkKey(key1, rollback);
-                    checkKey(key2, rollback);
+                    checkKey(key1, rollback ? null : key1Nodes);
+                    checkKey(key2, rollback ? null : key2Nodes);
 
                     return true;
                 }
@@ -238,51 +241,105 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
             }
         }, 5000);
 
-        checkKey(key1, rollback);
-        checkKey(key2, rollback);
+        checkKey(key1, rollback ? null : key1Nodes);
+        checkKey(key2, rollback ? null : key2Nodes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+        primaryAndOriginatingNodeFailure(false, false, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
-        primaryAndOriginatingNodeFailure(false, true);
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+        primaryAndOriginatingNodeFailure(true, false, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testOptimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
-        primaryAndOriginatingNodeFailure(true, true);
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+        primaryAndOriginatingNodeFailure(false, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
-        primaryAndOriginatingNodeFailure(false, false);
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+        primaryAndOriginatingNodeFailure(true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testPessimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
-        primaryAndOriginatingNodeFailure(true, false);
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+        primaryAndOriginatingNodeFailure(false, false, false);
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+        primaryAndOriginatingNodeFailure(true, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+        primaryAndOriginatingNodeFailure(false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+        primaryAndOriginatingNodeFailure(true, true, false);
+    }
+
+    /**
+     * @param locBackupKey If {@code true} uses one key which is backup for originating node.
      * @param rollback If {@code true} tests rollback after primary node failure.
      * @param optimistic If {@code true} tests optimistic transaction.
      * @throws Exception If failed.
      */
-    private void primaryAndOriginatingNodeFailure(final boolean rollback, boolean optimistic) throws Exception {
+    private void primaryAndOriginatingNodeFailure(final boolean locBackupKey,
+        final boolean rollback,
+        boolean optimistic)
+        throws Exception
+    {
         IgniteCache<Integer, Integer> cache0 = jcache(0);
-        IgniteCache<Integer, Integer> cache1 = jcache(1);
         IgniteCache<Integer, Integer> cache2 = jcache(2);
 
-        final Integer key1 = primaryKey(cache1);
+        Affinity<Integer> aff = ignite(0).affinity(null);
+
+        Integer key0 = null;
+
+        for (int key = 0; key < 10_000; key++) {
+            if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
+                if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+                    key0 = key;
+
+                    break;
+                }
+            }
+        }
+
+        assertNotNull(key0);
+
+        final Integer key1 = key0;
         final Integer key2 = primaryKey(cache2);
 
+        int backups = cache0.getConfiguration(CacheConfiguration.class).getBackups();
+
+        final Collection<ClusterNode> key1Nodes =
+            (locBackupKey && backups < 2) ? null : aff.mapKeyToPrimaryAndBackups(key1);
+        final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
+
         TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
 
         IgniteTransactions txs = ignite(0).transactions();
@@ -326,12 +383,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 try {
-                    checkKey(key1, rollback);
-                    checkKey(key2, rollback);
+                    checkKey(key1, rollback ? null : key1Nodes);
+                    checkKey(key2, rollback ? null : key2Nodes);
 
                     return true;
-                }
-                catch (AssertionError e) {
+                } catch (AssertionError e) {
                     log.info("Check failed: " + e);
 
                     return false;
@@ -339,30 +395,53 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
             }
         }, 5000);
 
-        checkKey(key1, rollback);
-        checkKey(key2, rollback);
+        checkKey(key1, rollback ? null : key1Nodes);
+        checkKey(key2, rollback ? null : key2Nodes);
     }
 
     /**
      * @param key Key.
-     * @param expNull {@code True} if {@code null} value is expected.
+     * @param keyNodes Key nodes.
      */
-    private void checkKey(Integer key, boolean expNull) {
-        Affinity<Integer> aff = ignite(2).affinity(null);
+    private void checkKey(Integer key, Collection<ClusterNode> keyNodes) {
+        if (keyNodes == null) {
+            for (Ignite ignite : G.allGrids()) {
+                IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+                assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+            }
 
-        Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+            for (Ignite ignite : G.allGrids()) {
+                IgniteCache<Integer, Integer> cache = ignite.cache(null);
 
-        assertFalse(nodes.isEmpty());
+                assertNull("Unexpected value for: " + ignite.name(), cache.get(key));
+            }
+        }
+        else {
+            boolean found = false;
 
-        for (ClusterNode node : nodes) {
-            Ignite ignite = grid(node);
+            for (ClusterNode node : keyNodes) {
+                try {
+                    Ignite ignite = grid(node);
 
-            IgniteCache<Integer, Integer> cache = ignite.cache(null);
+                    found = true;
 
-            if (expNull)
-                assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
-            else
-                assertEquals("Unexpected value for: " + ignite.name(), key, cache.localPeek(key));
+                    IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+                    assertEquals("Unexpected value for: " + ignite.name(), key, key);
+                }
+                catch (IgniteIllegalStateException ignore) {
+                    // No-op.
+                }
+            }
+
+            assertTrue("Failed to find key node.", found);
+
+            for (Ignite ignite : G.allGrids()) {
+                IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+                assertEquals("Unexpected value for: " + ignite.name(), key, cache.get(key));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index bb019ae..28b10d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -353,9 +353,6 @@ public class IgniteCacheTestSuite extends TestSuite {
         // Iterators.
         suite.addTest(IgniteCacheIteratorsSelfTestSuite.suite());
 
-        // Add tx recovery test suite.
-        suite.addTest(IgniteCacheTxRecoverySelfTestSuite.suite());
-
         // Cache interceptor tests.
         suite.addTest(IgniteCacheInterceptorSelfTestSuite.suite());