You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/05/08 04:58:05 UTC
[09/36] incubator-ignite git commit: # ignite-157-2
# ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/65099366
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/65099366
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/65099366
Branch: refs/heads/ignite-sprint-5
Commit: 65099366d767073de9a1e7a5d1b5ed67b4306fe8
Parents: fbf7149
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 10:35:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 13:50:49 2015 +0300
----------------------------------------------------------------------
...ridCacheOptimisticCheckPreparedTxFuture.java | 14 +-
.../cache/transactions/IgniteTxManager.java | 50 +++----
...ePrimaryNodeFailureRecoveryAbstractTest.java | 147 ++++++++++++++-----
.../ignite/testsuites/IgniteCacheTestSuite.java | 3 -
4 files changed, 150 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 3e345f4..bd3e1cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -163,6 +163,9 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
try {
cctx.io().send(nearNodeId, req, tx.ioPolicy());
}
+ catch (ClusterTopologyCheckedException e) {
+ fut.onNodeLeft();
+ }
catch (IgniteCheckedException e) {
fut.onError(e);
}
@@ -398,7 +401,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
cctx.tm().finishOptimisticTxOnRecovery(tx, res);
}
else {
- if (nearTxCheck) {
+ if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
if (log.isDebugEnabled())
log.debug("Failed to check transaction on near node, " +
"ignoring [err=" + err + ", tx=" + tx + ']');
@@ -480,7 +483,14 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
if (log.isDebugEnabled())
log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
- onDone(true);
+ if (nearTxCheck) {
+ // Near and originating nodes left, need initiate tx check.
+ cctx.tm().commitIfPrepared(tx);
+
+ onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+ }
+ else
+ onDone(true);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 19efc5d..85b3ad0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2053,6 +2053,31 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Commits optimistic transaction in case when node started transaction failed, but all related
+ * transactions were prepared (invalidates transaction if it is not fully prepared).
+ *
+ * @param tx Transaction.
+ */
+ public void commitIfPrepared(IgniteInternalTx tx) {
+ assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
+ assert !F.isEmpty(tx.transactionNodes()) : tx;
+ assert tx.nearXidVersion() != null : tx;
+
+ GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
+ cctx,
+ tx,
+ tx.originatingNodeId(),
+ tx.transactionNodes());
+
+ cctx.mvcc().addFuture(fut);
+
+ if (log.isDebugEnabled())
+ log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
+
+ fut.prepare();
+ }
+
+ /**
* Timeout object for node failure handler.
*/
private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
@@ -2122,31 +2147,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
cctx.kernalContext().gateway().readUnlock();
}
}
-
- /**
- * Commits optimistic transaction in case when node started transaction failed, but all related
- * transactions were prepared (invalidates transaction if it is not fully prepared).
- *
- * @param tx Transaction.
- */
- private void commitIfPrepared(IgniteInternalTx tx) {
- assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
- assert !F.isEmpty(tx.transactionNodes()) : tx;
- assert tx.nearXidVersion() != null : tx;
-
- GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
- cctx,
- tx,
- evtNodeId,
- tx.transactionNodes());
-
- cctx.mvcc().addFuture(fut);
-
- if (log.isDebugEnabled())
- log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
-
- fut.prepare();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index 7a393d8..ee2f16b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -149,7 +149,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
}
/**
- * @param locBackupKey If {@code true} uses recovery for local backup key.
+ * @param locBackupKey If {@code true} uses one key which is backup for originating node.
* @param rollback If {@code true} tests rollback after primary node failure.
* @param optimistic If {@code true} tests optimistic transaction.
* @throws Exception If failed.
@@ -177,6 +177,9 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
final Integer key1 = key0;
final Integer key2 = primaryKey(cache2);
+ final Collection<ClusterNode> key1Nodes = aff.mapKeyToPrimaryAndBackups(key1);
+ final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
+
TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
IgniteTransactions txs = ignite(0).transactions();
@@ -225,8 +228,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
- checkKey(key1, rollback);
- checkKey(key2, rollback);
+ checkKey(key1, rollback ? null : key1Nodes);
+ checkKey(key2, rollback ? null : key2Nodes);
return true;
}
@@ -238,51 +241,105 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
}
}, 5000);
- checkKey(key1, rollback);
- checkKey(key2, rollback);
+ checkKey(key1, rollback ? null : key1Nodes);
+ checkKey(key2, rollback ? null : key2Nodes);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+ primaryAndOriginatingNodeFailure(false, false, true);
}
/**
* @throws Exception If failed.
*/
- public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
- primaryAndOriginatingNodeFailure(false, true);
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+ primaryAndOriginatingNodeFailure(true, false, true);
}
/**
* @throws Exception If failed.
*/
- public void testOptimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
- primaryAndOriginatingNodeFailure(true, true);
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+ primaryAndOriginatingNodeFailure(false, true, true);
}
/**
* @throws Exception If failed.
*/
- public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
- primaryAndOriginatingNodeFailure(false, false);
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+ primaryAndOriginatingNodeFailure(true, true, true);
}
/**
* @throws Exception If failed.
*/
- public void testPessimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
- primaryAndOriginatingNodeFailure(true, false);
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+ primaryAndOriginatingNodeFailure(false, false, false);
}
/**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+ primaryAndOriginatingNodeFailure(true, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+ primaryAndOriginatingNodeFailure(false, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+ primaryAndOriginatingNodeFailure(true, true, false);
+ }
+
+ /**
+ * @param locBackupKey If {@code true} uses one key which is backup for originating node.
* @param rollback If {@code true} tests rollback after primary node failure.
* @param optimistic If {@code true} tests optimistic transaction.
* @throws Exception If failed.
*/
- private void primaryAndOriginatingNodeFailure(final boolean rollback, boolean optimistic) throws Exception {
+ private void primaryAndOriginatingNodeFailure(final boolean locBackupKey,
+ final boolean rollback,
+ boolean optimistic)
+ throws Exception
+ {
IgniteCache<Integer, Integer> cache0 = jcache(0);
- IgniteCache<Integer, Integer> cache1 = jcache(1);
IgniteCache<Integer, Integer> cache2 = jcache(2);
- final Integer key1 = primaryKey(cache1);
+ Affinity<Integer> aff = ignite(0).affinity(null);
+
+ Integer key0 = null;
+
+ for (int key = 0; key < 10_000; key++) {
+ if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
+ if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+ key0 = key;
+
+ break;
+ }
+ }
+ }
+
+ assertNotNull(key0);
+
+ final Integer key1 = key0;
final Integer key2 = primaryKey(cache2);
+ int backups = cache0.getConfiguration(CacheConfiguration.class).getBackups();
+
+ final Collection<ClusterNode> key1Nodes =
+ (locBackupKey && backups < 2) ? null : aff.mapKeyToPrimaryAndBackups(key1);
+ final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
+
TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
IgniteTransactions txs = ignite(0).transactions();
@@ -326,12 +383,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
- checkKey(key1, rollback);
- checkKey(key2, rollback);
+ checkKey(key1, rollback ? null : key1Nodes);
+ checkKey(key2, rollback ? null : key2Nodes);
return true;
- }
- catch (AssertionError e) {
+ } catch (AssertionError e) {
log.info("Check failed: " + e);
return false;
@@ -339,30 +395,53 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
}
}, 5000);
- checkKey(key1, rollback);
- checkKey(key2, rollback);
+ checkKey(key1, rollback ? null : key1Nodes);
+ checkKey(key2, rollback ? null : key2Nodes);
}
/**
* @param key Key.
- * @param expNull {@code True} if {@code null} value is expected.
+ * @param keyNodes Key nodes.
*/
- private void checkKey(Integer key, boolean expNull) {
- Affinity<Integer> aff = ignite(2).affinity(null);
+ private void checkKey(Integer key, Collection<ClusterNode> keyNodes) {
+ if (keyNodes == null) {
+ for (Ignite ignite : G.allGrids()) {
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+ }
- Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+ for (Ignite ignite : G.allGrids()) {
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
- assertFalse(nodes.isEmpty());
+ assertNull("Unexpected value for: " + ignite.name(), cache.get(key));
+ }
+ }
+ else {
+ boolean found = false;
- for (ClusterNode node : nodes) {
- Ignite ignite = grid(node);
+ for (ClusterNode node : keyNodes) {
+ try {
+ Ignite ignite = grid(node);
- IgniteCache<Integer, Integer> cache = ignite.cache(null);
+ found = true;
- if (expNull)
- assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
- else
- assertEquals("Unexpected value for: " + ignite.name(), key, cache.localPeek(key));
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ assertEquals("Unexpected value for: " + ignite.name(), key, key);
+ }
+ catch (IgniteIllegalStateException ignore) {
+ // No-op.
+ }
+ }
+
+ assertTrue("Failed to find key node.", found);
+
+ for (Ignite ignite : G.allGrids()) {
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ assertEquals("Unexpected value for: " + ignite.name(), key, cache.get(key));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index bb019ae..28b10d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -353,9 +353,6 @@ public class IgniteCacheTestSuite extends TestSuite {
// Iterators.
suite.addTest(IgniteCacheIteratorsSelfTestSuite.suite());
- // Add tx recovery test suite.
- suite.addTest(IgniteCacheTxRecoverySelfTestSuite.suite());
-
// Cache interceptor tests.
suite.addTest(IgniteCacheInterceptorSelfTestSuite.suite());