You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/06 12:24:32 UTC
[01/18] incubator-ignite git commit: # ignite-157-2
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-841 ed8a1ee8a -> c2fceac7b
# ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d14a0fb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d14a0fb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d14a0fb4
Branch: refs/heads/ignite-841
Commit: d14a0fb40eca2c6259f2be66bcffbc1c53f988b8
Parents: 8062947
Author: sboikov <sb...@gridgain.com>
Authored: Mon Apr 27 17:03:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Apr 27 17:03:48 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAbstractFailoverSelfTest.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d14a0fb4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index 4cb7365..5389ef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -177,11 +177,11 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
info("Run topology change.");
try {
+ String name = "new-node-" + Thread.currentThread().getName();
+
for (int i = 0; i < TOP_CHANGE_CNT && err.get() == null; i++) {
info("Topology change " + i);
- String name = UUID.randomUUID().toString();
-
try {
final Ignite g = startGrid(name);
[18/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-4' into ignite-841
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-841
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c2fceac7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c2fceac7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c2fceac7
Branch: refs/heads/ignite-841
Commit: c2fceac7ba65cb15847fdd3757537b86d297169c
Parents: ed8a1ee d4908f2
Author: ivasilinets <iv...@gridgain.com>
Authored: Wed May 6 13:24:12 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Wed May 6 13:24:12 2015 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 4 +-
.../communication/GridIoMessageFactory.java | 4 +-
.../cache/DynamicCacheDescriptor.java | 16 +-
.../processors/cache/GridCacheAdapter.java | 511 +++++++++---------
.../processors/cache/GridCacheProcessor.java | 117 ++--
...ridCacheOptimisticCheckPreparedTxFuture.java | 434 ---------------
...idCacheOptimisticCheckPreparedTxRequest.java | 232 --------
...dCacheOptimisticCheckPreparedTxResponse.java | 179 -------
.../distributed/GridCacheTxRecoveryFuture.java | 506 ++++++++++++++++++
.../distributed/GridCacheTxRecoveryRequest.java | 261 +++++++++
.../GridCacheTxRecoveryResponse.java | 182 +++++++
.../GridDistributedTxRemoteAdapter.java | 2 +-
.../cache/transactions/IgniteInternalTx.java | 5 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 38 +-
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 173 ++----
.../resources/META-INF/classnames.properties | 12 +-
.../processors/cache/CacheGetFromJobTest.java | 110 ++++
.../GridCacheAbstractFailoverSelfTest.java | 4 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 94 ++--
...xOriginatingNodeFailureAbstractSelfTest.java | 2 +-
...rDisabledPrimaryNodeFailureRecoveryTest.java | 31 ++
...rtitionedPrimaryNodeFailureRecoveryTest.java | 31 ++
...woBackupsPrimaryNodeFailureRecoveryTest.java | 37 ++
...ePrimaryNodeFailureRecoveryAbstractTest.java | 533 +++++++++++++++++++
.../GridCachePartitionedNodeRestartTest.java | 4 +-
...ePartitionedOptimisticTxNodeRestartTest.java | 4 +-
.../GridCacheReplicatedNodeRestartSelfTest.java | 2 +
.../testsuites/IgniteCacheRestartTestSuite.java | 5 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 3 -
.../IgniteCacheTxRecoverySelfTestSuite.java | 4 +
32 files changed, 2172 insertions(+), 1372 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2fceac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
[08/18] incubator-ignite git commit: # ignite-157-2
Posted by sb...@apache.org.
# ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/65099366
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/65099366
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/65099366
Branch: refs/heads/ignite-841
Commit: 65099366d767073de9a1e7a5d1b5ed67b4306fe8
Parents: fbf7149
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 10:35:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 13:50:49 2015 +0300
----------------------------------------------------------------------
...ridCacheOptimisticCheckPreparedTxFuture.java | 14 +-
.../cache/transactions/IgniteTxManager.java | 50 +++----
...ePrimaryNodeFailureRecoveryAbstractTest.java | 147 ++++++++++++++-----
.../ignite/testsuites/IgniteCacheTestSuite.java | 3 -
4 files changed, 150 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 3e345f4..bd3e1cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -163,6 +163,9 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
try {
cctx.io().send(nearNodeId, req, tx.ioPolicy());
}
+ catch (ClusterTopologyCheckedException e) {
+ fut.onNodeLeft();
+ }
catch (IgniteCheckedException e) {
fut.onError(e);
}
@@ -398,7 +401,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
cctx.tm().finishOptimisticTxOnRecovery(tx, res);
}
else {
- if (nearTxCheck) {
+ if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
if (log.isDebugEnabled())
log.debug("Failed to check transaction on near node, " +
"ignoring [err=" + err + ", tx=" + tx + ']');
@@ -480,7 +483,14 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
if (log.isDebugEnabled())
log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
- onDone(true);
+ if (nearTxCheck) {
+ // Near and originating nodes left, need initiate tx check.
+ cctx.tm().commitIfPrepared(tx);
+
+ onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+ }
+ else
+ onDone(true);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 19efc5d..85b3ad0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2053,6 +2053,31 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Commits optimistic transaction in case when node started transaction failed, but all related
+ * transactions were prepared (invalidates transaction if it is not fully prepared).
+ *
+ * @param tx Transaction.
+ */
+ public void commitIfPrepared(IgniteInternalTx tx) {
+ assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
+ assert !F.isEmpty(tx.transactionNodes()) : tx;
+ assert tx.nearXidVersion() != null : tx;
+
+ GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
+ cctx,
+ tx,
+ tx.originatingNodeId(),
+ tx.transactionNodes());
+
+ cctx.mvcc().addFuture(fut);
+
+ if (log.isDebugEnabled())
+ log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
+
+ fut.prepare();
+ }
+
+ /**
* Timeout object for node failure handler.
*/
private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
@@ -2122,31 +2147,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
cctx.kernalContext().gateway().readUnlock();
}
}
-
- /**
- * Commits optimistic transaction in case when node started transaction failed, but all related
- * transactions were prepared (invalidates transaction if it is not fully prepared).
- *
- * @param tx Transaction.
- */
- private void commitIfPrepared(IgniteInternalTx tx) {
- assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
- assert !F.isEmpty(tx.transactionNodes()) : tx;
- assert tx.nearXidVersion() != null : tx;
-
- GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
- cctx,
- tx,
- evtNodeId,
- tx.transactionNodes());
-
- cctx.mvcc().addFuture(fut);
-
- if (log.isDebugEnabled())
- log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
-
- fut.prepare();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index 7a393d8..ee2f16b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -149,7 +149,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
}
/**
- * @param locBackupKey If {@code true} uses recovery for local backup key.
+ * @param locBackupKey If {@code true} uses one key which is backup for originating node.
* @param rollback If {@code true} tests rollback after primary node failure.
* @param optimistic If {@code true} tests optimistic transaction.
* @throws Exception If failed.
@@ -177,6 +177,9 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
final Integer key1 = key0;
final Integer key2 = primaryKey(cache2);
+ final Collection<ClusterNode> key1Nodes = aff.mapKeyToPrimaryAndBackups(key1);
+ final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
+
TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
IgniteTransactions txs = ignite(0).transactions();
@@ -225,8 +228,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
- checkKey(key1, rollback);
- checkKey(key2, rollback);
+ checkKey(key1, rollback ? null : key1Nodes);
+ checkKey(key2, rollback ? null : key2Nodes);
return true;
}
@@ -238,51 +241,105 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
}
}, 5000);
- checkKey(key1, rollback);
- checkKey(key2, rollback);
+ checkKey(key1, rollback ? null : key1Nodes);
+ checkKey(key2, rollback ? null : key2Nodes);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+ primaryAndOriginatingNodeFailure(false, false, true);
}
/**
* @throws Exception If failed.
*/
- public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
- primaryAndOriginatingNodeFailure(false, true);
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+ primaryAndOriginatingNodeFailure(true, false, true);
}
/**
* @throws Exception If failed.
*/
- public void testOptimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
- primaryAndOriginatingNodeFailure(true, true);
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+ primaryAndOriginatingNodeFailure(false, true, true);
}
/**
* @throws Exception If failed.
*/
- public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
- primaryAndOriginatingNodeFailure(false, false);
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+ primaryAndOriginatingNodeFailure(true, true, true);
}
/**
* @throws Exception If failed.
*/
- public void testPessimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
- primaryAndOriginatingNodeFailure(true, false);
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+ primaryAndOriginatingNodeFailure(false, false, false);
}
/**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+ primaryAndOriginatingNodeFailure(true, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+ primaryAndOriginatingNodeFailure(false, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+ primaryAndOriginatingNodeFailure(true, true, false);
+ }
+
+ /**
+ * @param locBackupKey If {@code true} uses one key which is backup for originating node.
* @param rollback If {@code true} tests rollback after primary node failure.
* @param optimistic If {@code true} tests optimistic transaction.
* @throws Exception If failed.
*/
- private void primaryAndOriginatingNodeFailure(final boolean rollback, boolean optimistic) throws Exception {
+ private void primaryAndOriginatingNodeFailure(final boolean locBackupKey,
+ final boolean rollback,
+ boolean optimistic)
+ throws Exception
+ {
IgniteCache<Integer, Integer> cache0 = jcache(0);
- IgniteCache<Integer, Integer> cache1 = jcache(1);
IgniteCache<Integer, Integer> cache2 = jcache(2);
- final Integer key1 = primaryKey(cache1);
+ Affinity<Integer> aff = ignite(0).affinity(null);
+
+ Integer key0 = null;
+
+ for (int key = 0; key < 10_000; key++) {
+ if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
+ if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+ key0 = key;
+
+ break;
+ }
+ }
+ }
+
+ assertNotNull(key0);
+
+ final Integer key1 = key0;
final Integer key2 = primaryKey(cache2);
+ int backups = cache0.getConfiguration(CacheConfiguration.class).getBackups();
+
+ final Collection<ClusterNode> key1Nodes =
+ (locBackupKey && backups < 2) ? null : aff.mapKeyToPrimaryAndBackups(key1);
+ final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
+
TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
IgniteTransactions txs = ignite(0).transactions();
@@ -326,12 +383,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
- checkKey(key1, rollback);
- checkKey(key2, rollback);
+ checkKey(key1, rollback ? null : key1Nodes);
+ checkKey(key2, rollback ? null : key2Nodes);
return true;
- }
- catch (AssertionError e) {
+ } catch (AssertionError e) {
log.info("Check failed: " + e);
return false;
@@ -339,30 +395,53 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
}
}, 5000);
- checkKey(key1, rollback);
- checkKey(key2, rollback);
+ checkKey(key1, rollback ? null : key1Nodes);
+ checkKey(key2, rollback ? null : key2Nodes);
}
/**
* @param key Key.
- * @param expNull {@code True} if {@code null} value is expected.
+ * @param keyNodes Key nodes.
*/
- private void checkKey(Integer key, boolean expNull) {
- Affinity<Integer> aff = ignite(2).affinity(null);
+ private void checkKey(Integer key, Collection<ClusterNode> keyNodes) {
+ if (keyNodes == null) {
+ for (Ignite ignite : G.allGrids()) {
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+ }
- Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+ for (Ignite ignite : G.allGrids()) {
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
- assertFalse(nodes.isEmpty());
+ assertNull("Unexpected value for: " + ignite.name(), cache.get(key));
+ }
+ }
+ else {
+ boolean found = false;
- for (ClusterNode node : nodes) {
- Ignite ignite = grid(node);
+ for (ClusterNode node : keyNodes) {
+ try {
+ Ignite ignite = grid(node);
- IgniteCache<Integer, Integer> cache = ignite.cache(null);
+ found = true;
- if (expNull)
- assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
- else
- assertEquals("Unexpected value for: " + ignite.name(), key, cache.localPeek(key));
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ assertEquals("Unexpected value for: " + ignite.name(), key, key);
+ }
+ catch (IgniteIllegalStateException ignore) {
+ // No-op.
+ }
+ }
+
+ assertTrue("Failed to find key node.", found);
+
+ for (Ignite ignite : G.allGrids()) {
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ assertEquals("Unexpected value for: " + ignite.name(), key, cache.get(key));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index bb019ae..28b10d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -353,9 +353,6 @@ public class IgniteCacheTestSuite extends TestSuite {
// Iterators.
suite.addTest(IgniteCacheIteratorsSelfTestSuite.suite());
- // Add tx recovery test suite.
- suite.addTest(IgniteCacheTxRecoverySelfTestSuite.suite());
-
// Cache interceptor tests.
suite.addTest(IgniteCacheInterceptorSelfTestSuite.suite());
[07/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-4' into ignite-157-2
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fbf7149d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fbf7149d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fbf7149d
Branch: refs/heads/ignite-841
Commit: fbf7149d775472f9dc1e9cba9a0c10a6780688e6
Parents: 45199eb 54f9492
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 09:56:03 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 09:56:03 2015 +0300
----------------------------------------------------------------------
DEVNOTES.txt | 4 +-
assembly/release-base.xml | 2 +
assembly/release-schema-import.xml | 50 +++
.../streaming/wordcount/CacheConfig.java | 5 -
.../config/grid-client-config.properties | 50 +--
.../ClientPropertiesConfigurationSelfTest.java | 12 +-
.../java/org/apache/ignite/IgniteCache.java | 5 +
.../org/apache/ignite/IgniteJdbcDriver.java | 81 +++--
.../configuration/CacheConfiguration.java | 255 +++++++++++---
.../configuration/IgniteConfiguration.java | 344 +++++++++++++++----
.../ignite/internal/GridUpdateNotifier.java | 66 +++-
.../apache/ignite/internal/IgniteKernal.java | 83 +++--
.../org/apache/ignite/internal/IgnitionEx.java | 15 +-
.../client/GridClientConfiguration.java | 2 +-
.../managers/communication/GridIoManager.java | 8 +-
.../processors/cache/GridCacheTtlManager.java | 168 +++++----
.../processors/cache/GridCacheUtils.java | 5 +-
.../apache/ignite/lang/IgniteAsyncSupport.java | 4 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 19 +-
.../discovery/tcp/TcpClientDiscoverySpi.java | 4 -
.../spi/discovery/tcp/TcpDiscoverySpi.java | 4 -
.../discovery/tcp/TcpDiscoverySpiAdapter.java | 8 +-
.../internal/GridUpdateNotifierSelfTest.java | 13 +-
.../IgniteCacheEntryListenerAbstractTest.java | 4 +-
...CacheLoadingConcurrentGridStartSelfTest.java | 154 +++++++++
...GridCacheLoadingConcurrentGridStartTest.java | 154 ---------
.../tcp/TcpClientDiscoverySelfTest.java | 8 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +-
modules/schema-import/pom.xml | 6 +-
pom.xml | 227 ++++++++++--
30 files changed, 1225 insertions(+), 537 deletions(-)
----------------------------------------------------------------------
[03/18] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-157-2' into ignite-157-2
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-157-2' into ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fc54ef7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fc54ef7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fc54ef7a
Branch: refs/heads/ignite-841
Commit: fc54ef7a10cd6e28152b19d21de320d47a5c3e3c
Parents: e0810ed 2ea83ce
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 11:55:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 11:55:59 2015 +0300
----------------------------------------------------------------------
.../ignite/testsuites/IgniteCacheFailoverTestSuite.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[10/18] incubator-ignite git commit: # ignite-157-2
Posted by sb...@apache.org.
# ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/42563f6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/42563f6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/42563f6c
Branch: refs/heads/ignite-841
Commit: 42563f6c1b0f57e6a087c819df26893df5510ae9
Parents: 3394b4c
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 16:06:52 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 17:53:56 2015 +0300
----------------------------------------------------------------------
.../internal/processors/cache/transactions/IgniteTxHandler.java | 5 +++++
.../apache/ignite/testsuites/IgniteCacheRestartTestSuite.java | 2 +-
2 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42563f6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6843075..2897e30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -53,6 +53,11 @@ public class IgniteTxHandler {
/** Shared cache context. */
private GridCacheSharedContext<?, ?> ctx;
+ /**
+ * @param nearNodeId Node ID.
+ * @param req Request.
+ * @return Prepare future.
+ */
public IgniteInternalFuture<IgniteInternalTx> processNearTxPrepareRequest(final UUID nearNodeId,
final GridNearTxPrepareRequest req) {
return prepareTx(nearNodeId, null, req, null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42563f6c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index b219f7f..d620731 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -35,8 +35,8 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedTxSalvageSelfTest.class);
- suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
// TODO: IGNITE-157.
+ // suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
// suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
// suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
[02/18] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-157-2' into ignite-157-2
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-157-2' into ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e0810ed8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e0810ed8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e0810ed8
Branch: refs/heads/ignite-841
Commit: e0810ed828ab802e96cbccf65078b71faf674968
Parents: d14a0fb 2229d74
Author: sboikov <sb...@gridgain.com>
Authored: Tue Apr 28 09:17:24 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Apr 28 09:17:24 2015 +0300
----------------------------------------------------------------------
.../main/java/org/apache/ignite/Ignition.java | 44 ++++
.../org/apache/ignite/internal/IgnitionEx.java | 165 +++++++++++++-
.../processors/cache/CacheInvokeResult.java | 24 +-
.../processors/cache/GridCacheAdapter.java | 2 +-
.../processors/cache/GridCacheMapEntry.java | 4 +-
.../processors/cache/GridCacheReturn.java | 5 +-
.../dht/atomic/GridDhtAtomicCache.java | 3 +-
.../local/atomic/GridLocalAtomicCache.java | 6 +-
.../util/spring/IgniteSpringHelper.java | 54 ++++-
.../DataStreamerMultiThreadedSelfTest.java | 19 +-
.../GridSwapSpaceSpiAbstractSelfTest.java | 2 +-
.../query/h2/sql/BaseH2CompareQueryTest.java | 32 +--
.../util/spring/IgniteSpringHelperImpl.java | 217 +++++++++++++++----
.../IgniteStartFromStreamConfigurationTest.java | 50 +++++
.../testsuites/IgniteSpringTestSuite.java | 2 +
15 files changed, 548 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
[16/18] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-sprint-4' into ignite-sprint-4
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-4' into ignite-sprint-4
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7be25bd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7be25bd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7be25bd7
Branch: refs/heads/ignite-841
Commit: 7be25bd7859f6cf45e6454c44ed9b1501d87fd4b
Parents: ba210bb 587103f
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 6 12:44:56 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 6 12:44:56 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 109 ++++++++++--------
.../processors/cache/CacheGetFromJobTest.java | 110 +++++++++++++++++++
.../testsuites/IgniteCacheRestartTestSuite.java | 1 +
3 files changed, 174 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7be25bd7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
[11/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-4' into ignite-157-2
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b141abfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b141abfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b141abfd
Branch: refs/heads/ignite-841
Commit: b141abfd5c4c4219d17e6a1cc1a7a1677b06b3c0
Parents: 42563f6 07a4258
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 6 09:12:56 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 6 09:12:56 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCachePartitionExchangeManager.java | 3 +++
.../cache/distributed/dht/GridCacheDhtPreloadSelfTest.java | 2 +-
2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[09/18] incubator-ignite git commit: # ignite-157-2
Posted by sb...@apache.org.
# ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3394b4c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3394b4c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3394b4c5
Branch: refs/heads/ignite-841
Commit: 3394b4c5aa8c557b94c8726ad5f27a01f2d76d32
Parents: 6509936
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 15:16:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 15:41:34 2015 +0300
----------------------------------------------------------------------
.../GridCacheAbstractNodeRestartSelfTest.java | 94 +++++++++++---------
.../GridCachePartitionedNodeRestartTest.java | 4 +-
...ePartitionedOptimisticTxNodeRestartTest.java | 4 +-
.../GridCacheReplicatedNodeRestartSelfTest.java | 2 +
.../testsuites/IgniteCacheRestartTestSuite.java | 6 +-
5 files changed, 58 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 17d6e42..85e2c7c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -77,10 +77,10 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
private static volatile int idx = -1;
/** Preload mode. */
- protected CacheRebalanceMode preloadMode = ASYNC;
+ protected CacheRebalanceMode rebalancMode = ASYNC;
/** */
- protected int preloadBatchSize = DFLT_BATCH_SIZE;
+ protected int rebalancBatchSize = DFLT_BATCH_SIZE;
/** Number of partitions. */
protected int partitions = DFLT_PARTITIONS;
@@ -124,8 +124,8 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
@Override protected void beforeTest() throws Exception {
backups = DFLT_BACKUPS;
partitions = DFLT_PARTITIONS;
- preloadMode = ASYNC;
- preloadBatchSize = DFLT_BATCH_SIZE;
+ rebalancMode = ASYNC;
+ rebalancBatchSize = DFLT_BATCH_SIZE;
nodeCnt = DFLT_NODE_CNT;
keyCnt = DFLT_KEY_CNT;
retries = DFLT_RETRIES;
@@ -160,7 +160,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
* @throws Exception If failed.
*/
public void testRestart() throws Exception {
- preloadMode = SYNC;
+ rebalancMode = SYNC;
partitions = 3;
nodeCnt = 2;
keyCnt = 10;
@@ -240,7 +240,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 2;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 30000;
@@ -255,7 +255,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 2;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 30000;
@@ -270,7 +270,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 2;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 30000;
@@ -285,7 +285,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 2;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 30000;
@@ -300,7 +300,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 4;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 60000;
@@ -315,7 +315,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 4;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 60000;
@@ -330,7 +330,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 4;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 60000;
@@ -345,7 +345,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 4;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 60000;
@@ -360,7 +360,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 6;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 90000;
@@ -375,7 +375,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 6;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 90000;
@@ -390,7 +390,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 8;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 90000;
@@ -405,7 +405,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 8;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 90000;
@@ -420,7 +420,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 10;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 90000;
@@ -435,7 +435,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 10;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 90000;
@@ -450,7 +450,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 10;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 90000;
@@ -465,7 +465,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
nodeCnt = 4;
keyCnt = 10;
partitions = 29;
- preloadMode = ASYNC;
+ rebalancMode = ASYNC;
long duration = 90000;
@@ -525,7 +525,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
catch (Exception e) {
err.compareAndSet(null, e);
- error("Failed to put value in cache.", e);
+ error("Unexpected exception in put-worker.", e);
}
}
}, "put-worker-" + i);
@@ -565,7 +565,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
catch (Exception e) {
err.compareAndSet(null, e);
- error("Failed to restart grid node.", e);
+ error("Unexpected exception in restart-worker.", e);
}
}
}, "restart-worker-" + i);
@@ -643,12 +643,16 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
int c = 0;
try {
- try (Transaction tx = ignite.transactions().txStart(txConcurrency(), REPEATABLE_READ)) {
+ IgniteTransactions txs = ignite.transactions();
+
+ try (Transaction tx = txs.txStart(txConcurrency(), REPEATABLE_READ)) {
c = txCntr.incrementAndGet();
- if (c % logFreq == 0)
- info(">>> Tx iteration started [cnt=" + c + ", keys=" + keys + ", " +
- "locNodeId=" + locNodeId + ']');
+ if (c % logFreq == 0) {
+ info(">>> Tx iteration started [cnt=" + c +
+ ", keys=" + keys +
+ ", locNodeId=" + locNodeId + ']');
+ }
for (int key : keys) {
int op = cacheOp();
@@ -664,17 +668,15 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
tx.commit();
}
}
- catch (ClusterTopologyException | CacheException e) {
- if (e instanceof CacheException
- && !(e.getCause() instanceof ClusterTopologyException))
- throw e;
-
+ catch (ClusterTopologyException | CacheException ignored) {
// It is ok if primary node leaves grid.
}
- if (c % logFreq == 0)
- info(">>> Tx iteration finished [cnt=" + c + ", keys=" + keys + ", " +
- "locNodeId=" + locNodeId + ']');
+ if (c % logFreq == 0) {
+ info(">>> Tx iteration finished [cnt=" + c +
+ ", keys=" + keys +
+ ", locNodeId=" + locNodeId + ']');
+ }
}
info(">>> " + Thread.currentThread().getName() + " finished.");
@@ -682,7 +684,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
catch (Exception e) {
err.compareAndSet(null, e);
- error("Failed to put value in cache.", e);
+ error("Unexpected exception in put-worker.", e);
}
}
}, "put-worker-" + i);
@@ -719,7 +721,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
catch (Exception e) {
err.compareAndSet(null, e);
- error("Failed to restart grid node.", e);
+ error("Unexpected exception in restart-worker.", e);
}
}
}, "restart-worker-" + i);
@@ -812,19 +814,21 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
tx.commit();
}
- catch (ClusterTopologyException ignored) {
+ catch (ClusterTopologyException | CacheException ignored) {
// It is ok if primary node leaves grid.
}
- if (c % logFreq == 0)
- info(">>> Tx iteration finished [cnt=" + c + ", keys=" + keys + ", " +
+ if (c % logFreq == 0) {
+ info(">>> Tx iteration finished [cnt=" + c +
+ ", keys=" + keys + ", " +
"locNodeId=" + locNodeId + ']');
+ }
}
}
catch (Exception e) {
err.compareAndSet(null, e);
- error("Failed to put value in cache.", e);
+ error("Unexpected exception in put-worker.", e);
}
}
}, "put-worker-" + i);
@@ -859,7 +863,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
catch (Exception e) {
err.compareAndSet(null, e);
- error("Failed to restart grid node.", e);
+ error("Unexpected exception in restart-worker.", e);
}
}
}, "restart-worker-" + i);
@@ -893,10 +897,12 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
* @param attempt Attempt.
*/
private void printFailureDetails(IgniteCache<Integer, String> c, int key, int attempt) {
+ Ignite ignite = c.unwrap(Ignite.class);
+
error("*** Failure details ***");
error("Key: " + key);
- error("Partition: " + c.getConfiguration(CacheConfiguration.class).getAffinity().partition(key));
+ error("Partition: " + ignite.affinity(c.getName()).partition(key));
error("Attempt: " + attempt);
- error("Node: " + c.unwrap(Ignite.class).cluster().localNode().id());
+ error("Node: " + ignite.cluster().localNode().id());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
index 8a2b19a..2096836 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
@@ -43,8 +43,8 @@ public class GridCachePartitionedNodeRestartTest extends GridCacheAbstractNodeRe
cc.setWriteSynchronizationMode(FULL_ASYNC);
cc.setNearConfiguration(null);
cc.setStartSize(20);
- cc.setRebalanceMode(preloadMode);
- cc.setRebalanceBatchSize(preloadBatchSize);
+ cc.setRebalanceMode(rebalancMode);
+ cc.setRebalanceBatchSize(rebalancBatchSize);
cc.setAffinity(new RendezvousAffinityFunction(false, partitions));
cc.setBackups(backups);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
index 62dfaa9..82da2ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
@@ -42,8 +42,8 @@ public class GridCachePartitionedOptimisticTxNodeRestartTest extends GridCacheAb
cc.setCacheMode(PARTITIONED);
cc.setWriteSynchronizationMode(FULL_ASYNC);
cc.setStartSize(20);
- cc.setRebalanceMode(preloadMode);
- cc.setRebalanceBatchSize(preloadBatchSize);
+ cc.setRebalanceMode(rebalancMode);
+ cc.setRebalanceBatchSize(rebalancBatchSize);
cc.setAffinity(new RendezvousAffinityFunction(false, partitions));
cc.setBackups(backups);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
index aa32559..0023160 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
@@ -34,6 +34,8 @@ public class GridCacheReplicatedNodeRestartSelfTest extends GridCacheAbstractNod
CacheConfiguration cc = defaultCacheConfiguration();
+ cc.setNearConfiguration(null);
+
cc.setAtomicityMode(atomicityMode());
cc.setName(CACHE_NAME);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index 0ced1c8..b219f7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -21,7 +21,6 @@ import junit.framework.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
/**
* In-Memory Data Grid stability test suite on changing topology.
@@ -36,10 +35,9 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedTxSalvageSelfTest.class);
- // TODO: GG-7419: Enable when fixed.
- // suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
+ suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
+ // TODO: IGNITE-157.
// suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
- // TODO: uncomment when fix GG-1969
// suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class);
[06/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-4' into ignite-157-2
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/45199ebf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/45199ebf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/45199ebf
Branch: refs/heads/ignite-841
Commit: 45199ebfc89763283226ea2824f877b1d043d36e
Parents: 5daaa27 47136b5
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 17:54:22 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 17:54:22 2015 +0300
----------------------------------------------------------------------
modules/cloud/README.txt | 32 ++
modules/cloud/licenses/apache-2.0.txt | 202 +++++++++
modules/cloud/pom.xml | 106 +++++
.../cloud/TcpDiscoveryCloudIpFinder.java | 433 +++++++++++++++++++
.../tcp/ipfinder/cloud/package-info.java | 21 +
.../TcpDiscoveryCloudIpFinderSelfTest.java | 124 ++++++
.../tcp/ipfinder/cloud/package-info.java | 22 +
.../ignite/testsuites/IgniteCloudTestSuite.java | 112 +++++
.../ignite/codegen/MessageCodeGenerator.java | 26 +-
.../configuration/CacheConfiguration.java | 4 +-
.../ignite/internal/GridDirectCollection.java | 3 +
.../ignite/internal/IgniteComponentType.java | 36 +-
.../managers/communication/GridIoManager.java | 22 +-
.../communication/GridIoMessageFactory.java | 8 +
.../managers/indexing/GridIndexingManager.java | 14 +-
.../processors/cache/CacheObjectImpl.java | 2 +-
.../processors/cache/GridCacheMapEntry.java | 9 +-
.../processors/cache/GridCacheProcessor.java | 3 +-
.../processors/cache/GridCacheSwapManager.java | 35 +-
.../cache/query/GridCacheQueryManager.java | 22 +-
.../cache/query/GridCacheSqlQuery.java | 135 +++++-
.../cache/query/GridCacheTwoStepQuery.java | 8 +-
.../processors/query/GridQueryIndexing.java | 19 +-
.../processors/query/GridQueryProcessor.java | 78 +++-
.../messages/GridQueryNextPageResponse.java | 68 ++-
.../h2/twostep/messages/GridQueryRequest.java | 21 +-
.../util/spring/IgniteSpringHelper.java | 4 +-
.../internal/visor/cache/VisorCacheMetrics.java | 53 +--
.../cache/VisorCacheNearConfiguration.java | 4 +-
.../visor/cache/VisorCacheStartTask.java | 155 +++++++
.../internal/visor/util/VisorTaskUtils.java | 10 +
.../discovery/tcp/TcpDiscoverySpiAdapter.java | 10 +-
.../resources/META-INF/classnames.properties | 13 +
.../core/src/main/resources/ignite.properties | 2 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +-
modules/gce/README.txt | 32 ++
modules/gce/licenses/apache-2.0.txt | 202 +++++++++
modules/gce/pom.xml | 92 ++++
.../gce/TcpDiscoveryGoogleStorageIpFinder.java | 380 ++++++++++++++++
.../tcp/ipfinder/gce/package-info.java | 22 +
...pDiscoveryGoogleStorageIpFinderSelfTest.java | 73 ++++
.../tcp/ipfinder/gce/package-info.java | 22 +
.../ignite/testsuites/IgniteGCETestSuite.java | 71 +++
.../processors/query/h2/IgniteH2Indexing.java | 169 +++++++-
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 92 +---
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 7 +-
.../query/h2/opt/GridH2KeyValueRowOnheap.java | 6 +-
.../query/h2/opt/GridH2RowDescriptor.java | 14 +-
.../processors/query/h2/opt/GridH2Table.java | 10 +-
.../query/h2/opt/GridH2ValueCacheObject.java | 191 ++++++++
.../query/h2/opt/GridLuceneIndex.java | 84 ++--
.../query/h2/twostep/GridMapQueryExecutor.java | 21 +-
.../query/h2/twostep/GridMergeIndex.java | 6 +-
.../h2/twostep/GridMergeIndexUnsorted.java | 4 +-
.../h2/twostep/GridReduceQueryExecutor.java | 13 +-
.../query/h2/twostep/GridResultPage.java | 80 +++-
.../query/h2/twostep/msg/GridH2Array.java | 124 ++++++
.../query/h2/twostep/msg/GridH2Boolean.java | 112 +++++
.../query/h2/twostep/msg/GridH2Byte.java | 113 +++++
.../query/h2/twostep/msg/GridH2Bytes.java | 113 +++++
.../query/h2/twostep/msg/GridH2CacheObject.java | 148 +++++++
.../query/h2/twostep/msg/GridH2Date.java | 115 +++++
.../query/h2/twostep/msg/GridH2Decimal.java | 134 ++++++
.../query/h2/twostep/msg/GridH2Double.java | 113 +++++
.../query/h2/twostep/msg/GridH2Float.java | 113 +++++
.../query/h2/twostep/msg/GridH2Geometry.java | 134 ++++++
.../query/h2/twostep/msg/GridH2Integer.java | 113 +++++
.../query/h2/twostep/msg/GridH2JavaObject.java | 113 +++++
.../query/h2/twostep/msg/GridH2Long.java | 113 +++++
.../query/h2/twostep/msg/GridH2Null.java | 78 ++++
.../query/h2/twostep/msg/GridH2Short.java | 113 +++++
.../query/h2/twostep/msg/GridH2String.java | 115 +++++
.../query/h2/twostep/msg/GridH2Time.java | 116 +++++
.../query/h2/twostep/msg/GridH2Timestamp.java | 133 ++++++
.../query/h2/twostep/msg/GridH2Uuid.java | 133 ++++++
.../h2/twostep/msg/GridH2ValueMessage.java | 49 +++
.../twostep/msg/GridH2ValueMessageFactory.java | 201 +++++++++
.../IgniteCacheQueryMultiThreadedSelfTest.java | 6 +-
.../h2/GridIndexingSpiAbstractSelfTest.java | 130 ++++--
.../util/spring/IgniteSpringHelperImpl.java | 2 +-
...gniteProjectionStartStopRestartSelfTest.java | 26 +-
.../commands/cache/VisorCacheCommand.scala | 2 +-
pom.xml | 5 +-
83 files changed, 5687 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
[13/18] incubator-ignite git commit: #ignite-834:
IgniteCache.clearAll() throws NPE. #ignite-732: IgniteCache.size() should not
fail in case of topology changes.
Posted by sb...@apache.org.
#ignite-834: IgniteCache.clearAll() throws NPE.
#ignite-732: IgniteCache.size() should not fail in case of topology changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/99c7e228
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/99c7e228
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/99c7e228
Branch: refs/heads/ignite-841
Commit: 99c7e228d12e25826f74d6d8706d158ec36004ed
Parents: 9ff8029
Author: ivasilinets <iv...@gridgain.com>
Authored: Wed May 6 12:30:57 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Wed May 6 12:30:57 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 511 +++++++++----------
.../resources/META-INF/classnames.properties | 6 +-
2 files changed, 248 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99c7e228/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3f4e97b..6674993 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -21,10 +21,10 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.compute.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.affinity.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -1083,7 +1083,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
// Clear local cache synchronously.
clearLocally();
- clearRemotes(0, new GlobalClearAllCallable(name()));
+ clearRemotes(0, null);
}
/** {@inheritDoc} */
@@ -1091,7 +1091,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
// Clear local cache synchronously.
clearLocally(key);
- clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+ clearRemotes(0, Collections.singleton(key));
}
/** {@inheritDoc} */
@@ -1099,83 +1099,55 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
// Clear local cache synchronously.
clearLocallyAll(keys);
- clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys));
+ clearRemotes(0, keys);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> clearAsync(K key) {
- return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+ return clearKeysAsync(Collections.singleton(key));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) {
- return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
+ return clearKeysAsync(keys);
}
/**
* @param timeout Timeout for clearLocally all task in milliseconds (0 for never).
* Set it to larger value for large caches.
- * @param clearCall Global clear callable object.
+ * @param keys Keys to clear or {@code null} if all cache should be cleared.
* @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes.
*/
- private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException {
- try {
- // Send job to remote nodes only.
- Collection<ClusterNode> nodes =
- ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
-
- IgniteInternalFuture<Object> fut = null;
-
- if (!nodes.isEmpty()) {
- ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
+ private void clearRemotes(long timeout, @Nullable final Set<? extends K> keys) throws IgniteCheckedException {
+ // Send job to remote nodes only.
+ Collection<ClusterNode> nodes =
+ ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
- fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
- }
+ if (!nodes.isEmpty()) {
+ ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
- if (fut != null)
- fut.get();
- }
- catch (ClusterGroupEmptyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]");
- }
- catch (ComputeTaskTimeoutCheckedException e) {
- U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " +
- "'networkTimeout' configuration property) [cacheName=" + name() + "]");
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- throw e;
+ ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get();
}
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> clearAsync() {
- return clearAsync(new GlobalClearAllCallable(name()));
+ return clearKeysAsync(null);
}
/**
- * @param clearCall Global clear callable object.
+ * @param keys Keys to clear or {@code null} if all cache should be cleared.
* @return Future.
*/
- private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) {
+ private IgniteInternalFuture<?> clearKeysAsync(final Set<? extends K> keys) {
Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes();
if (!nodes.isEmpty()) {
- IgniteInternalFuture<Object> fut =
- ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() {
- @Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException {
- try {
- return fut.get();
- }
- catch (ClusterGroupEmptyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]");
-
- return null;
- }
- }
- });
+ return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null);
}
else
return new GridFinishedFuture<>();
@@ -3562,7 +3534,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] peekModes) {
+ @Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[] peekModes) {
assert peekModes != null;
PeekModes modes = parsePeekModes(peekModes, true);
@@ -3576,22 +3548,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (nodes.isEmpty())
return new GridFinishedFuture<>(0);
- IgniteInternalFuture<Collection<Integer>> fut =
- ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes);
-
- return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() {
- @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut)
- throws IgniteCheckedException {
- Collection<Integer> res = fut.get();
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- int totalSize = 0;
-
- for (Integer size : res)
- totalSize += size;
-
- return totalSize;
- }
- });
+ return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null);
}
/** {@inheritDoc} */
@@ -3909,50 +3868,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
- * Gets cache global size (with or without backups).
- *
- * @param primaryOnly {@code True} if only primary sizes should be included.
- * @return Global size.
- * @throws IgniteCheckedException If internal task execution failed.
- */
- private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
- try {
- // Send job to remote nodes only.
- Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
-
- IgniteInternalFuture<Collection<Integer>> fut = null;
-
- if (!nodes.isEmpty()) {
- ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout());
-
- fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes);
- }
-
- // Get local value.
- int globalSize = primaryOnly ? primarySize() : size();
-
- if (fut != null) {
- for (Integer i : fut.get())
- globalSize += i;
- }
-
- return globalSize;
- }
- catch (ClusterGroupEmptyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]");
-
- return primaryOnly ? primarySize() : size();
- }
- catch (ComputeTaskTimeoutCheckedException e) {
- U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " +
- "'networkTimeout' configuration property) [cacheName=" + name() + "]");
-
- throw e;
- }
- }
-
- /**
* @param op Cache operation.
* @param <T> Return type.
* @return Operation result.
@@ -4893,67 +4808,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
- * Internal callable which performs clear operation on a cache with the given name.
- */
- @GridInternal
- private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable {
- /** Cache name. */
- protected String cacheName;
-
- /** Injected grid instance. */
- @IgniteInstanceResource
- protected Ignite ignite;
-
- /**
- * Empty constructor for serialization.
- */
- public GlobalClearCallable() {
- // No-op.
- }
-
- /**
- * @param cacheName Cache name.
- */
- protected GlobalClearCallable(String cacheName) {
- this.cacheName = cacheName;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, cacheName);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheName = U.readString(in);
- }
- }
-
- /**
* Global clear all.
*/
@GridInternal
- private static class GlobalClearAllCallable extends GlobalClearCallable {
+ private static class GlobalClearAllJob extends TopologyVersionAwareJob {
/** */
private static final long serialVersionUID = 0L;
/**
* Empty constructor for serialization.
*/
- public GlobalClearAllCallable() {
+ public GlobalClearAllJob() {
// No-op.
}
/**
* @param cacheName Cache name.
+ * @param topVer Affinity topology version.
*/
- private GlobalClearAllCallable(String cacheName) {
- super(cacheName);
+ private GlobalClearAllJob(String cacheName, AffinityTopologyVersion topVer) {
+ super(cacheName, topVer);
}
/** {@inheritDoc} */
- @Override public Object call() throws Exception {
- ((IgniteEx)ignite).cachex(cacheName).clearLocally();
+ @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
+ if (cache != null)
+ cache.clearLocally();
return null;
}
@@ -4963,7 +4843,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* Global clear keys.
*/
@GridInternal
- private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable {
+ private static class GlobalClearKeySetJob<K> extends TopologyVersionAwareJob {
/** */
private static final long serialVersionUID = 0L;
@@ -4973,166 +4853,75 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* Empty constructor for serialization.
*/
- public GlobalClearKeySetCallable() {
+ public GlobalClearKeySetJob() {
// No-op.
}
/**
* @param cacheName Cache name.
+ * @param topVer Affinity topology version.
* @param keys Keys to clear.
*/
- private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) {
- super(cacheName);
+ private GlobalClearKeySetJob(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) {
+ super(cacheName, topVer);
this.keys = keys;
}
/** {@inheritDoc} */
- @Override public Object call() throws Exception {
- ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
+ @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
+ if (cache != null)
+ cache.clearLocallyAll(keys);
return null;
}
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- super.writeExternal(out);
-
- out.writeObject(keys);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- super.readExternal(in);
-
- keys = (Set<K>) in.readObject();
- }
}
/**
* Internal callable for global size calculation.
*/
@GridInternal
- private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable {
+ private static class SizeJob extends TopologyVersionAwareJob {
/** */
private static final long serialVersionUID = 0L;
- /** Cache name. */
- private String cacheName;
-
/** Peek modes. */
private CachePeekMode[] peekModes;
- /** Injected grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
-
/**
* Required by {@link Externalizable}.
*/
- public SizeCallable() {
+ public SizeJob() {
// No-op.
}
/**
* @param cacheName Cache name.
+ * @param topVer Affinity topology version.
* @param peekModes Cache peek modes.
*/
- private SizeCallable(String cacheName, CachePeekMode[] peekModes) {
- this.cacheName = cacheName;
- this.peekModes = peekModes;
- }
-
- /** {@inheritDoc} */
- @Override public Integer applyx(Object o) throws IgniteCheckedException {
- IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
- assert cache != null : cacheName;
-
- return cache.localSize(peekModes);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, cacheName);
-
- out.writeInt(peekModes.length);
+ private SizeJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
+ super(cacheName, topVer);
- for (int i = 0; i < peekModes.length; i++)
- U.writeEnum(out, peekModes[i]);
+ this.peekModes = peekModes;
}
/** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheName = U.readString(in);
-
- int len = in.readInt();
+ @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
+ if (cache == null)
+ return 0;
- peekModes = new CachePeekMode[len];
-
- for (int i = 0; i < len; i++)
- peekModes[i] = CachePeekMode.fromOrdinal(in.readByte());
+ try {
+ return cache.localSize(peekModes);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
/** {@inheritDoc} */
public String toString() {
- return S.toString(SizeCallable.class, this);
- }
- }
-
- /**
- * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()}
- * operation on a cache with the given name.
- */
- @GridInternal
- private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Cache name. */
- private String cacheName;
-
- /** Primary only flag. */
- private boolean primaryOnly;
-
- /** Injected grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /**
- * Empty constructor for serialization.
- */
- public GlobalSizeCallable() {
- // No-op.
- }
-
- /**
- * @param cacheName Cache name.
- * @param primaryOnly Primary only flag.
- */
- private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
- this.cacheName = cacheName;
- this.primaryOnly = primaryOnly;
- }
-
- /** {@inheritDoc} */
- @Override public Integer apply(Object o) {
- IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
- return primaryOnly ? cache.primarySize() : cache.size();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, cacheName);
- out.writeBoolean(primaryOnly);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheName = U.readString(in);
- primaryOnly = in.readBoolean();
+ return S.toString(SizeJob.class, this);
}
}
@@ -5697,4 +5486,194 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
metrics.addPutAndGetTimeNanos(System.nanoTime() - start);
}
}
+
+ /**
+ * Delayed callable class.
+ */
+ protected static abstract class TopologyVersionAwareJob extends ComputeJobAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Injected job context. */
+ @JobContextResource
+ protected ComputeJobContext jobCtx;
+
+ /** Injected grid instance. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /** Affinity topology version. */
+ protected AffinityTopologyVersion topVer;
+
+ /** Cache name. */
+ protected String cacheName;
+
+ /**
+ * Empty constructor for serialization.
+ */
+ public TopologyVersionAwareJob() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
+ */
+ public TopologyVersionAwareJob(String cacheName, AffinityTopologyVersion topVer) {
+ assert topVer != null;
+
+ this.cacheName = cacheName;
+ this.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public final Object execute() {
+ waitAffinityReadyFuture();
+
+ IgniteInternalCache cache = ((IgniteKernal)ignite).context().cache().cache(cacheName);
+
+ return localExecute(cache);
+ }
+
+ /**
+ * @param cache Cache.
+ * @return Local execution result.
+ */
+ @Nullable protected abstract Object localExecute(@Nullable IgniteInternalCache cache);
+
+ /**
+ * Holds (suspends) job execution until our cache version becomes equal to remote cache's version.
+ */
+ private void waitAffinityReadyFuture() {
+ GridCacheProcessor cacheProc = ((IgniteKernal)ignite).context().cache();
+
+ AffinityTopologyVersion locTopVer = cacheProc.context().exchange().readyAffinityVersion();
+
+ if (locTopVer.compareTo(topVer) < 0) {
+ IgniteInternalFuture<?> fut = cacheProc.context().exchange().affinityReadyFuture(topVer);
+
+ if (fut != null && !fut.isDone()) {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ jobCtx.callcc();
+ }
+ });
+
+ jobCtx.holdcc();
+ }
+ }
+ }
+ }
+
+ /**
+ * Size task.
+ */
+ private static class SizeTask extends ComputeTaskAdapter<Object, Integer> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache context. */
+ private GridCacheContext ctx;
+
+ /** Peek modes. */
+ private CachePeekMode[] peekModes;
+
+ /**
+ * Empty constructor for serialization.
+ */
+ public SizeTask() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Cache context.
+ */
+ public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) {
+ this.ctx = ctx;
+ this.peekModes = peekModes;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) throws IgniteException {
+ Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
+ int size = 0;
+
+ for (ComputeJobResult res : results) {
+ if (res.getException() == null && res != null)
+ size += res.<Integer>getData();
+ }
+
+ return size;
+ }
+ }
+
+ /**
+ * Clear task.
+ */
+ private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache context. */
+ private GridCacheContext ctx;
+
+ /** Keys to clear. */
+ private Set<? extends K> keys;
+
+ /**
+ * Empty constructor for serialization.
+ */
+ public ClearTask() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Cache context.
+ * @param keys Keys to clear.
+ */
+ public ClearTask(GridCacheContext ctx, Set<? extends K> keys) {
+ this.ctx = ctx;
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) throws IgniteException {
+ Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+ for (ClusterNode node : subgrid) {
+ jobs.put(keys == null ?
+ new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion()) :
+ new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys),
+ node);
+ }
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99c7e228/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 35495ed..ff263cd 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -323,13 +323,13 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$72
org.apache.ignite.internal.processors.cache.GridCacheAdapter$73
org.apache.ignite.internal.processors.cache.GridCacheAdapter$74
org.apache.ignite.internal.processors.cache.GridCacheAdapter$9
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllJob
org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetJob
org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalSizeCallable
org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadCacheClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadKeysCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeJob
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure
[04/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-4' into ignite-157-2
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4b775f02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4b775f02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4b775f02
Branch: refs/heads/ignite-841
Commit: 4b775f02e2632fac72d44678f5ef13a04d9e1355
Parents: fc54ef7 18b4c39
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 11:56:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 11:56:18 2015 +0300
----------------------------------------------------------------------
RELEASE_NOTES.txt | 13 +-
.../spi/checkpoint/s3/S3CheckpointSpi.java | 2 +-
.../clients/src/test/resources/spring-cache.xml | 4 +-
.../src/test/resources/spring-server-node.xml | 4 +-
.../test/resources/spring-server-ssl-node.xml | 4 +-
.../java/org/apache/ignite/IgniteLogger.java | 8 +-
.../java/org/apache/ignite/IgniteServices.java | 2 +-
.../main/java/org/apache/ignite/Ignition.java | 2 +-
.../eviction/sorted/SortedEvictionPolicy.java | 2 +-
.../configuration/ConnectorConfiguration.java | 2 +-
.../configuration/IgniteConfiguration.java | 2 +-
.../ignite/internal/GridJobContextImpl.java | 7 +-
.../org/apache/ignite/internal/IgnitionEx.java | 2 +-
.../internal/cluster/ClusterGroupAdapter.java | 16 +
.../managers/communication/GridIoManager.java | 6 +-
.../deployment/GridDeploymentManager.java | 2 +-
.../GridDeploymentPerVersionStore.java | 3 +-
.../cache/GridCacheSharedContext.java | 2 +-
.../processors/cache/IgniteCacheProxy.java | 10 +-
.../distributed/dht/GridDhtLockRequest.java | 38 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 10 +-
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../distributed/near/GridNearLockRequest.java | 54 +-
.../ignite/internal/util/GridJavaProcess.java | 4 +
.../util/tostring/GridToStringBuilder.java | 2 +-
.../ignite/internal/visor/cache/VisorCache.java | 92 ++--
.../visor/cache/VisorCacheConfiguration.java | 7 -
.../visor/cache/VisorCacheNodesTask.java | 74 +++
.../cache/VisorCacheStoreConfiguration.java | 35 --
.../cache/VisorCacheTypeFieldMetadata.java | 36 +-
.../visor/cache/VisorCacheTypeMetadata.java | 99 +---
.../internal/visor/igfs/VisorIgfsMetrics.java | 128 +----
.../visor/node/VisorBasicConfiguration.java | 11 +
.../visor/node/VisorNodeDataCollectorJob.java | 2 +-
.../node/VisorNodeEventsCollectorTask.java | 58 +-
.../internal/visor/query/VisorQueryArg.java | 19 +-
.../internal/visor/query/VisorQueryCursor.java | 1 -
.../internal/visor/query/VisorQueryJob.java | 9 +-
.../internal/visor/query/VisorQueryTask.java | 41 --
.../internal/visor/util/VisorEventMapper.java | 13 +
.../internal/visor/util/VisorTaskUtils.java | 2 +-
.../apache/ignite/logger/java/JavaLogger.java | 12 +-
.../apache/ignite/marshaller/Marshaller.java | 14 +-
.../ignite/marshaller/jdk/JdkMarshaller.java | 10 +-
.../optimized/OptimizedMarshaller.java | 8 +-
.../apache/ignite/resources/LoggerResource.java | 2 +-
.../apache/ignite/resources/SpringResource.java | 2 +-
.../org/apache/ignite/services/Service.java | 2 +-
.../ignite/services/ServiceConfiguration.java | 2 +-
.../checkpoint/cache/CacheCheckpointSpi.java | 2 +-
.../spi/checkpoint/jdbc/JdbcCheckpointSpi.java | 2 +-
.../sharedfs/SharedFsCheckpointSpi.java | 4 +-
.../fifoqueue/FifoQueueCollisionSpi.java | 10 +-
.../jobstealing/JobStealingCollisionSpi.java | 14 +-
.../PriorityQueueCollisionSpi.java | 6 +-
.../communication/tcp/TcpCommunicationSpi.java | 12 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 10 +-
.../memory/MemoryEventStorageSpi.java | 10 +-
.../spi/failover/always/AlwaysFailoverSpi.java | 10 +-
.../jobstealing/JobStealingFailoverSpi.java | 6 +-
.../spi/failover/never/NeverFailoverSpi.java | 8 +-
.../apache/ignite/spi/indexing/IndexingSpi.java | 4 +-
.../adaptive/AdaptiveLoadBalancingSpi.java | 12 +-
.../roundrobin/RoundRobinLoadBalancingSpi.java | 10 +-
.../WeightedRandomLoadBalancingSpi.java | 10 +-
.../spi/swapspace/file/FileSwapSpaceSpi.java | 10 +-
.../src/test/config/load/merge-sort-base.xml | 2 +-
.../internal/GridDiscoveryEventSelfTest.java | 30 +-
...ProjectionForCachesOnDaemonNodeSelfTest.java | 147 +++++
.../distributed/GridCacheLockAbstractTest.java | 75 +++
.../DataStreamerMultiThreadedSelfTest.java | 2 +
.../logger/java/IgniteJavaLoggerTest.java | 65 ---
.../ignite/logger/java/JavaLoggerTest.java | 65 +++
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 5 +-
.../junits/logger/GridTestLog4jLogger.java | 10 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
.../testsuites/IgniteLoggingSelfTestSuite.java | 2 +-
.../processors/hadoop/HadoopProcessor.java | 2 +-
.../ignite/logger/jcl/IgniteJclLogger.java | 167 ------
.../org/apache/ignite/logger/jcl/JclLogger.java | 167 ++++++
.../ignite/logger/jcl/IgniteJclLoggerTest.java | 48 --
.../apache/ignite/logger/jcl/JclLoggerTest.java | 48 ++
.../ignite/testsuites/IgniteJclTestSuite.java | 2 +-
.../apache/ignite/logger/log4j/Log4JLogger.java | 8 +-
.../ignite/logger/slf4j/GridSlf4jLogger.java | 138 -----
.../apache/ignite/logger/slf4j/Slf4jLogger.java | 138 +++++
.../spi/deployment/uri/UriDeploymentSpi.java | 2 +-
.../ignite/visor/commands/VisorConsole.scala | 314 ++++++-----
.../visor/commands/VisorConsoleCommand.scala | 77 ---
.../ignite/visor/commands/VisorTextTable.scala | 539 ------------------
.../visor/commands/ack/VisorAckCommand.scala | 42 +-
.../commands/alert/VisorAlertCommand.scala | 35 +-
.../commands/cache/VisorCacheClearCommand.scala | 51 +-
.../commands/cache/VisorCacheCommand.scala | 34 +-
.../commands/cache/VisorCacheScanCommand.scala | 60 +-
.../commands/cache/VisorCacheStopCommand.scala | 30 +-
.../commands/cache/VisorCacheSwapCommand.scala | 66 +--
.../commands/common/VisorConsoleCommand.scala | 90 +++
.../visor/commands/common/VisorTextTable.scala | 543 +++++++++++++++++++
.../config/VisorConfigurationCommand.scala | 438 +++++++--------
.../commands/deploy/VisorDeployCommand.scala | 47 +-
.../commands/disco/VisorDiscoveryCommand.scala | 58 +-
.../commands/events/VisorEventsCommand.scala | 338 +++++-------
.../visor/commands/gc/VisorGcCommand.scala | 130 ++---
.../visor/commands/kill/VisorKillCommand.scala | 53 +-
.../visor/commands/node/VisorNodeCommand.scala | 47 +-
.../visor/commands/ping/VisorPingCommand.scala | 41 +-
.../commands/start/VisorStartCommand.scala | 34 +-
.../commands/tasks/VisorTasksCommand.scala | 76 +--
.../commands/top/VisorTopologyCommand.scala | 36 +-
.../visor/commands/vvm/VisorVvmCommand.scala | 32 +-
.../scala/org/apache/ignite/visor/visor.scala | 286 +++++++---
.../ignite/visor/VisorTextTableSpec.scala | 3 +-
pom.xml | 5 +-
114 files changed, 2777 insertions(+), 2875 deletions(-)
----------------------------------------------------------------------
[05/18] incubator-ignite git commit: # ignite-157-2 Tests and fix for
tx recovery issue
Posted by sb...@apache.org.
# ignite-157-2 Tests and fix for tx recovery issue
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5daaa278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5daaa278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5daaa278
Branch: refs/heads/ignite-841
Commit: 5daaa278afcca7e00be5002e3d5247661c6ba474
Parents: 4b775f0
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 12:48:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 17:13:48 2015 +0300
----------------------------------------------------------------------
...ridCacheOptimisticCheckPreparedTxFuture.java | 78 +++-
...idCacheOptimisticCheckPreparedTxRequest.java | 47 +-
.../GridDistributedTxRemoteAdapter.java | 2 +-
.../cache/transactions/IgniteInternalTx.java | 5 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 3 +-
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 47 +-
...xOriginatingNodeFailureAbstractSelfTest.java | 2 +-
...rDisabledPrimaryNodeFailureRecoveryTest.java | 31 ++
...rtitionedPrimaryNodeFailureRecoveryTest.java | 31 ++
...woBackupsPrimaryNodeFailureRecoveryTest.java | 37 ++
...ePrimaryNodeFailureRecoveryAbstractTest.java | 454 +++++++++++++++++++
.../IgniteCacheTxRecoverySelfTestSuite.java | 4 +
14 files changed, 713 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 8a14b48..3e345f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -70,6 +70,9 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
/** Transaction nodes mapping. */
private final Map<UUID, Collection<UUID>> txNodes;
+ /** */
+ private final boolean nearTxCheck;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -77,8 +80,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
* @param txNodes Transaction mapping.
*/
@SuppressWarnings("ConstantConditions")
- public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx, IgniteInternalTx tx,
- UUID failedNodeId, Map<UUID, Collection<UUID>> txNodes) {
+ public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx,
+ IgniteInternalTx tx,
+ UUID failedNodeId,
+ Map<UUID, Collection<UUID>> txNodes)
+ {
super(cctx.kernalContext(), CU.boolReducer());
this.cctx = cctx;
@@ -114,6 +120,10 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
}
}
}
+
+ UUID nearNodeId = tx.eventNodeId();
+
+ nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
}
/**
@@ -121,6 +131,48 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
*/
@SuppressWarnings("ConstantConditions")
public void prepare() {
+ if (nearTxCheck) {
+ UUID nearNodeId = tx.eventNodeId();
+
+ if (cctx.localNodeId().equals(nearNodeId)) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ try {
+ onDone(fut.get());
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ MiniFuture fut = new MiniFuture(tx.eventNodeId());
+
+ add(fut);
+
+ GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
+ tx,
+ 0,
+ true,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nearNodeId, req, tx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
+
+ markInitialized();
+ }
+
+ return;
+ }
+
// First check transactions on local node.
int locTxNum = nodeTransactions(cctx.localNodeId());
@@ -206,6 +258,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx,
nodeTransactions(id),
+ false,
futureId(),
fut.futureId());
@@ -228,7 +281,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
add(fut);
GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
- tx, nodeTransactions(nodeId), futureId(), fut.futureId());
+ tx,
+ nodeTransactions(nodeId),
+ false,
+ futureId(),
+ fut.futureId());
try {
cctx.io().send(nodeId, req, tx.ioPolicy());
@@ -341,11 +398,18 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
cctx.tm().finishOptimisticTxOnRecovery(tx, res);
}
else {
- if (log.isDebugEnabled())
- log.debug("Failed to check prepared transactions, " +
- "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+ if (nearTxCheck) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check transaction on near node, " +
+ "ignoring [err=" + err + ", tx=" + tx + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check prepared transactions, " +
+ "invalidating transaction [err=" + err + ", tx=" + tx + ']');
- cctx.tm().salvageTx(tx);
+ cctx.tm().salvageTx(tx);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
index e83db66..4f2a1d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
@@ -27,8 +27,7 @@ import java.io.*;
import java.nio.*;
/**
- * Message sent to check that transactions related to some optimistic transaction
- * were prepared on remote node.
+ * Message sent to check that transactions related to transaction were prepared on remote node.
*/
public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage {
/** */
@@ -49,6 +48,9 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
/** System transaction flag. */
private boolean sys;
+ /** {@code True} if should check only tx on near node. */
+ private boolean nearTxCheck;
+
/**
* Empty constructor required by {@link Externalizable}
*/
@@ -59,11 +61,16 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
/**
* @param tx Transaction.
* @param txNum Expected number of transactions on remote node.
+ * @param nearTxCheck
* @param futId Future ID.
* @param miniId Mini future ID.
*/
- public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx, int txNum, IgniteUuid futId,
- IgniteUuid miniId) {
+ public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx,
+ int txNum,
+ boolean nearTxCheck,
+ IgniteUuid futId,
+ IgniteUuid miniId)
+ {
super(tx.xidVersion(), 0);
nearXidVer = tx.nearXidVersion();
@@ -72,6 +79,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
this.futId = futId;
this.miniId = miniId;
this.txNum = txNum;
+ this.nearTxCheck = nearTxCheck;
+ }
+
+ /**
+ * @return {@code True} if should check only tx on near node.
+ */
+ public boolean nearTxCheck() {
+ return nearTxCheck;
}
/**
@@ -137,18 +152,24 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
writer.incrementState();
case 10:
- if (!writer.writeMessage("nearXidVer", nearXidVer))
+ if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("sys", sys))
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
case 12:
+ if (!writer.writeBoolean("sys", sys))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
if (!writer.writeInt("txNum", txNum))
return false;
@@ -187,7 +208,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
reader.incrementState();
case 10:
- nearXidVer = reader.readMessage("nearXidVer");
+ nearTxCheck = reader.readBoolean("nearTxCheck");
if (!reader.isLastRead())
return false;
@@ -195,7 +216,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
reader.incrementState();
case 11:
- sys = reader.readBoolean("sys");
+ nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
return false;
@@ -203,6 +224,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
reader.incrementState();
case 12:
+ sys = reader.readBoolean("sys");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
txNum = reader.readInt("txNum");
if (!reader.isLastRead())
@@ -222,7 +251,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 14;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 5c75390..3215138 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -206,7 +206,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
+ @Override public GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
boolean failFast,
KeyCacheObject key,
CacheEntryPredicate[] filter)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 30367c5..8dc07cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -338,8 +338,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
/**
* Gets node ID which directly started this transaction. In case of DHT local transaction it will be
- * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote
- * transaction it will be starter node ID.
+ * near node ID, in case of DHT remote transaction it will be primary node ID.
*
* @return Originating node ID.
*/
@@ -507,7 +506,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
* @return Current value for the key within transaction.
* @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}.
*/
- @Nullable public <K, V> GridTuple<CacheObject> peek(
+ @Nullable public GridTuple<CacheObject> peek(
GridCacheContext ctx,
boolean failFast,
KeyCacheObject key,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 1c02356..82d68b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1964,7 +1964,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Nullable @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext ctx,
+ @Nullable @Override public GridTuple<CacheObject> peek(GridCacheContext ctx,
boolean failFast,
KeyCacheObject key,
@Nullable CacheEntryPredicate[] filter) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6b45fee..6843075 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1183,7 +1183,8 @@ public class IgniteTxHandler {
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
- IgniteInternalFuture<Boolean> fut = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+ IgniteInternalFuture<Boolean> fut = req.nearTxCheck() ? ctx.tm().txCommitted(req.nearXidVersion()) :
+ ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
if (fut == null || fut.isDone()) {
boolean prepared;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index dfce09c..fc3efba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -330,7 +330,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@SuppressWarnings({"RedundantTypeArguments"})
- @Nullable @Override public <K, V> GridTuple<CacheObject> peek(
+ @Nullable @Override public GridTuple<CacheObject> peek(
GridCacheContext cacheCtx,
boolean failFast,
KeyCacheObject key,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c494602..19efc5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -727,14 +727,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param txId Transaction ID.
- * @return Transaction with given ID.
- */
- @Nullable public IgniteInternalTx txx(GridCacheVersion txId) {
- return idMap.get(txId);
- }
-
- /**
* Handles prepare stage of 2PC.
*
* @param tx Transaction to prepare.
@@ -1770,6 +1762,45 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param ver Version.
+ * @return Future for flag indicating if transactions was committed.
+ */
+ public IgniteInternalFuture<Boolean> txCommitted(GridCacheVersion ver) {
+ final GridFutureAdapter<Boolean> resFut = new GridFutureAdapter<>();
+
+ final IgniteInternalTx tx = cctx.tm().tx(ver);
+
+ if (tx != null) {
+ assert tx.near() && tx.local() : tx;
+
+ if (log.isDebugEnabled())
+ log.debug("Found near transaction, will wait for completion: " + tx);
+
+ tx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ TransactionState state = tx.state();
+
+ if (log.isDebugEnabled())
+ log.debug("Near transaction finished with state: " + state);
+
+ resFut.onDone(state == COMMITTED);
+ }
+ });
+
+ return resFut;
+ }
+
+ Boolean committed = completedVers.get(ver);
+
+ if (log.isDebugEnabled())
+ log.debug("Near transaction committed: " + committed);
+
+ resFut.onDone(committed != null && committed);
+
+ return resFut;
+ }
+
+ /**
* @param nearVer Near version ID.
* @param txNum Number of transactions.
* @param fut Result future.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 00bd43f..d664aa8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -156,7 +156,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
TransactionProxyImpl tx = (TransactionProxyImpl)txIgniteNode.transactions().txStart();
- IgniteInternalTx txEx = GridTestUtils.getFieldValue(tx, "tx");
+ IgniteInternalTx txEx = tx.tx();
assertTrue(txEx.optimistic());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..62d9b79
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest
+ extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..a40c989
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedPrimaryNodeFailureRecoveryTest
+ extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return new NearCacheConfiguration();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..70eef1d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest
+ extends IgniteCachePartitionedPrimaryNodeFailureRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+ assertEquals(1, ccfg.getBackups());
+
+ ccfg.setBackups(2);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
new file mode 100644
index 0000000..7a393d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+import static org.apache.ignite.transactions.TransactionState.*;
+
+/**
+ *
+ */
+public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends IgniteCacheAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryNodeFailureRecovery1() throws Exception {
+ primaryNodeFailure(false, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryNodeFailureRecovery2() throws Exception {
+ primaryNodeFailure(true, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryNodeFailureRollback1() throws Exception {
+ primaryNodeFailure(false, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryNodeFailureRollback2() throws Exception {
+ primaryNodeFailure(true, true, true);
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryNodeFailureRecovery1() throws Exception {
+ primaryNodeFailure(false, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryNodeFailureRecovery2() throws Exception {
+ primaryNodeFailure(true, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryNodeFailureRollback1() throws Exception {
+ primaryNodeFailure(false, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryNodeFailureRollback2() throws Exception {
+ primaryNodeFailure(true, true, false);
+ }
+
+ /**
+ * @param locBackupKey If {@code true} uses recovery for local backup key.
+ * @param rollback If {@code true} tests rollback after primary node failure.
+ * @param optimistic If {@code true} tests optimistic transaction.
+ * @throws Exception If failed.
+ */
+ private void primaryNodeFailure(boolean locBackupKey, final boolean rollback, boolean optimistic) throws Exception {
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
+ IgniteCache<Integer, Integer> cache2 = jcache(2);
+
+ Affinity<Integer> aff = ignite(0).affinity(null);
+
+ Integer key0 = null;
+
+ for (int key = 0; key < 10_000; key++) {
+ if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
+ if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+ key0 = key;
+
+ break;
+ }
+ }
+ }
+
+ assertNotNull(key0);
+
+ final Integer key1 = key0;
+ final Integer key2 = primaryKey(cache2);
+
+ TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+ IgniteTransactions txs = ignite(0).transactions();
+
+ try (Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ)) {
+ log.info("Put key1: " + key1);
+
+ cache0.put(key1, key1);
+
+ log.info("Put key2: " + key2);
+
+ cache0.put(key2, key2);
+
+ log.info("Start prepare.");
+
+ IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+
+ commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
+
+ IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+
+ waitPrepared(ignite(1));
+
+ log.info("Stop one primary node.");
+
+ stopGrid(1);
+
+ U.sleep(1000); // Wait some time to catch possible issues in tx recovery.
+
+ commSpi.stopBlock();
+
+ prepFut.get(10_000);
+
+ if (rollback) {
+ log.info("Rollback.");
+
+ tx.rollback();
+ }
+ else {
+ log.info("Commit.");
+
+ tx.commit();
+ }
+ }
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ checkKey(key1, rollback);
+ checkKey(key2, rollback);
+
+ return true;
+ }
+ catch (AssertionError e) {
+ log.info("Check failed: " + e);
+
+ return false;
+ }
+ }
+ }, 5000);
+
+ checkKey(key1, rollback);
+ checkKey(key2, rollback);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
+ primaryAndOriginatingNodeFailure(false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
+ primaryAndOriginatingNodeFailure(true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
+ primaryAndOriginatingNodeFailure(false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
+ primaryAndOriginatingNodeFailure(true, false);
+ }
+
+ /**
+ * @param rollback If {@code true} tests rollback after primary node failure.
+ * @param optimistic If {@code true} tests optimistic transaction.
+ * @throws Exception If failed.
+ */
+ private void primaryAndOriginatingNodeFailure(final boolean rollback, boolean optimistic) throws Exception {
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
+ IgniteCache<Integer, Integer> cache1 = jcache(1);
+ IgniteCache<Integer, Integer> cache2 = jcache(2);
+
+ final Integer key1 = primaryKey(cache1);
+ final Integer key2 = primaryKey(cache2);
+
+ TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+ IgniteTransactions txs = ignite(0).transactions();
+
+ Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ);
+
+ log.info("Put key1: " + key1);
+
+ cache0.put(key1, key1);
+
+ log.info("Put key2: " + key2);
+
+ cache0.put(key2, key2);
+
+ log.info("Start prepare.");
+
+ IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+
+ commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
+
+ IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+
+ waitPrepared(ignite(1));
+
+ log.info("Stop one primary node.");
+
+ stopGrid(1);
+
+ U.sleep(1000); // Wait some time to catch possible issues in tx recovery.
+
+ if (!rollback) {
+ commSpi.stopBlock();
+
+ prepFut.get(10_000);
+ }
+
+ log.info("Stop originating node.");
+
+ stopGrid(0);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ checkKey(key1, rollback);
+ checkKey(key2, rollback);
+
+ return true;
+ }
+ catch (AssertionError e) {
+ log.info("Check failed: " + e);
+
+ return false;
+ }
+ }
+ }, 5000);
+
+ checkKey(key1, rollback);
+ checkKey(key2, rollback);
+ }
+
+ /**
+ * @param key Key.
+ * @param expNull {@code True} if {@code null} value is expected.
+ */
+ private void checkKey(Integer key, boolean expNull) {
+ Affinity<Integer> aff = ignite(2).affinity(null);
+
+ Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+ assertFalse(nodes.isEmpty());
+
+ for (ClusterNode node : nodes) {
+ Ignite ignite = grid(node);
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ if (expNull)
+ assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+ else
+ assertEquals("Unexpected value for: " + ignite.name(), key, cache.localPeek(key));
+ }
+ }
+
+ /**
+ * @param ignite Node.
+ * @throws Exception If failed.
+ */
+ private void waitPrepared(Ignite ignite) throws Exception {
+ final IgniteTxManager tm = ((IgniteKernal)ignite).context().cache().context().tm();
+
+ boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ GridDhtTxLocal locTx = null;
+
+ for (IgniteInternalTx tx : tm.txs()) {
+ if (tx instanceof GridDhtTxLocal) {
+ assertNull("Only one tx is expected.", locTx);
+
+ locTx = (GridDhtTxLocal)tx;
+ }
+ }
+
+ log.info("Wait for tx, state: " + (locTx != null ? locTx.state() : null));
+
+ return locTx != null && locTx.state() == PREPARED;
+ }
+ }, 5000);
+
+ assertTrue("Failed to wait for tx.", wait);
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ private UUID blockNodeId;
+
+ /** */
+ private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ Object msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridNearTxPrepareRequest) {
+ synchronized (this) {
+ if (blockNodeId != null && blockNodeId.equals(node.id())) {
+ log.info("Block message: " + msg0);
+
+ blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
+
+ return;
+ }
+ }
+ }
+ }
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ void blockMessages(UUID nodeId) {
+ blockNodeId = nodeId;
+ }
+
+ /**
+ *
+ */
+ void stopBlock() {
+ synchronized (this) {
+ blockNodeId = null;
+
+ for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+ log.info("Send blocked message: " + msg.get2().message());
+
+ super.sendMessage(msg.get1(), msg.get2());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index e832099..1bd0e5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -33,6 +33,10 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("Cache tx recovery test suite");
+ suite.addTestSuite(IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.class);
+ suite.addTestSuite(IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.class);
+ suite.addTestSuite(IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class);
+
suite.addTestSuite(GridCachePartitionedTxOriginatingNodeFailureSelfTest.class);
suite.addTestSuite(GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest.class);
suite.addTestSuite(GridCacheReplicatedTxOriginatingNodeFailureSelfTest.class);
[15/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-157-2' into ignite-sprint-4
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-157-2' into ignite-sprint-4
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ba210bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ba210bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ba210bbb
Branch: refs/heads/ignite-841
Commit: ba210bbbfec66f4ff8913550e1e3b43ba65cf0e1
Parents: 99c7e22 f5f95fb
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 6 12:41:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 6 12:41:02 2015 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 4 +-
.../communication/GridIoMessageFactory.java | 4 +-
...ridCacheOptimisticCheckPreparedTxFuture.java | 434 ---------------
...idCacheOptimisticCheckPreparedTxRequest.java | 232 --------
...dCacheOptimisticCheckPreparedTxResponse.java | 179 -------
.../distributed/GridCacheTxRecoveryFuture.java | 506 ++++++++++++++++++
.../distributed/GridCacheTxRecoveryRequest.java | 261 +++++++++
.../GridCacheTxRecoveryResponse.java | 182 +++++++
.../GridDistributedTxRemoteAdapter.java | 2 +-
.../cache/transactions/IgniteInternalTx.java | 5 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 38 +-
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 173 ++----
.../resources/META-INF/classnames.properties | 6 +-
.../GridCacheAbstractFailoverSelfTest.java | 4 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 94 ++--
...xOriginatingNodeFailureAbstractSelfTest.java | 2 +-
...rDisabledPrimaryNodeFailureRecoveryTest.java | 31 ++
...rtitionedPrimaryNodeFailureRecoveryTest.java | 31 ++
...woBackupsPrimaryNodeFailureRecoveryTest.java | 37 ++
...ePrimaryNodeFailureRecoveryAbstractTest.java | 533 +++++++++++++++++++
.../GridCachePartitionedNodeRestartTest.java | 4 +-
...ePartitionedOptimisticTxNodeRestartTest.java | 4 +-
.../GridCacheReplicatedNodeRestartSelfTest.java | 2 +
.../testsuites/IgniteCacheRestartTestSuite.java | 4 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 3 -
.../IgniteCacheTxRecoverySelfTestSuite.java | 4 +
28 files changed, 1737 insertions(+), 1046 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ba210bbb/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
[12/18] incubator-ignite git commit: # ignite-157-2 renamings
Posted by sb...@apache.org.
# ignite-157-2 renamings
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f5f95fb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f5f95fb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f5f95fb8
Branch: refs/heads/ignite-841
Commit: f5f95fb8c952996f4479852b1ca2e086d3d57621
Parents: b141abf
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 6 09:56:30 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 6 09:56:30 2015 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 4 +-
.../communication/GridIoMessageFactory.java | 4 +-
...ridCacheOptimisticCheckPreparedTxFuture.java | 508 -------------------
...idCacheOptimisticCheckPreparedTxRequest.java | 261 ----------
...dCacheOptimisticCheckPreparedTxResponse.java | 179 -------
.../distributed/GridCacheTxRecoveryFuture.java | 506 ++++++++++++++++++
.../distributed/GridCacheTxRecoveryRequest.java | 261 ++++++++++
.../GridCacheTxRecoveryResponse.java | 182 +++++++
.../cache/transactions/IgniteTxHandler.java | 30 +-
.../cache/transactions/IgniteTxManager.java | 98 +---
.../resources/META-INF/classnames.properties | 6 +-
11 files changed, 976 insertions(+), 1063 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index e37b4f3..0540148 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -165,8 +165,8 @@ public class MessageCodeGenerator {
// gen.generateAndWrite(GridDhtTxFinishRequest.class);
// gen.generateAndWrite(GridDhtTxFinishResponse.class);
//
-// gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxRequest.class);
-// gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxResponse.class);
+// gen.generateAndWrite(GridCacheTxRecoveryRequest.class);
+// gen.generateAndWrite(GridCacheTxRecoveryResponse.class);
// gen.generateAndWrite(GridQueryCancelRequest.class);
// gen.generateAndWrite(GridQueryFailResponse.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index a395747..7fe8da8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -166,12 +166,12 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 16:
- msg = new GridCacheOptimisticCheckPreparedTxRequest();
+ msg = new GridCacheTxRecoveryRequest();
break;
case 17:
- msg = new GridCacheOptimisticCheckPreparedTxResponse();
+ msg = new GridCacheTxRecoveryResponse();
break;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
deleted file mode 100644
index bd3e1cc..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Future verifying that all remote transactions related to some
- * optimistic transaction were prepared.
- */
-public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
- implements GridCacheFuture<Boolean> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- private static IgniteLogger log;
-
- /** Trackable flag. */
- private boolean trackable = true;
-
- /** Context. */
- private final GridCacheSharedContext<K, V> cctx;
-
- /** Future ID. */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Transaction. */
- private final IgniteInternalTx tx;
-
- /** All involved nodes. */
- private final Map<UUID, ClusterNode> nodes;
-
- /** ID of failed node started transaction. */
- private final UUID failedNodeId;
-
- /** Transaction nodes mapping. */
- private final Map<UUID, Collection<UUID>> txNodes;
-
- /** */
- private final boolean nearTxCheck;
-
- /**
- * @param cctx Context.
- * @param tx Transaction.
- * @param failedNodeId ID of failed node started transaction.
- * @param txNodes Transaction mapping.
- */
- @SuppressWarnings("ConstantConditions")
- public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx,
- IgniteInternalTx tx,
- UUID failedNodeId,
- Map<UUID, Collection<UUID>> txNodes)
- {
- super(cctx.kernalContext(), CU.boolReducer());
-
- this.cctx = cctx;
- this.tx = tx;
- this.txNodes = txNodes;
- this.failedNodeId = failedNodeId;
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridCacheOptimisticCheckPreparedTxFuture.class);
-
- nodes = new GridLeanMap<>();
-
- UUID locNodeId = cctx.localNodeId();
-
- for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
- if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
- ClusterNode node = cctx.discovery().node(e.getKey());
-
- if (node != null)
- nodes.put(node.id(), node);
- else if (log.isDebugEnabled())
- log.debug("Transaction node left (will ignore) " + e.getKey());
- }
-
- for (UUID nodeId : e.getValue()) {
- if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
- ClusterNode node = cctx.discovery().node(nodeId);
-
- if (node != null)
- nodes.put(node.id(), node);
- else if (log.isDebugEnabled())
- log.debug("Transaction node left (will ignore) " + e.getKey());
- }
- }
- }
-
- UUID nearNodeId = tx.eventNodeId();
-
- nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
- }
-
- /**
- * Initializes future.
- */
- @SuppressWarnings("ConstantConditions")
- public void prepare() {
- if (nearTxCheck) {
- UUID nearNodeId = tx.eventNodeId();
-
- if (cctx.localNodeId().equals(nearNodeId)) {
- IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
-
- fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> fut) {
- try {
- onDone(fut.get());
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- }
- });
- }
- else {
- MiniFuture fut = new MiniFuture(tx.eventNodeId());
-
- add(fut);
-
- GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
- tx,
- 0,
- true,
- futureId(),
- fut.futureId());
-
- try {
- cctx.io().send(nearNodeId, req, tx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException e) {
- fut.onNodeLeft();
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
- }
-
- markInitialized();
- }
-
- return;
- }
-
- // First check transactions on local node.
- int locTxNum = nodeTransactions(cctx.localNodeId());
-
- if (locTxNum > 1) {
- IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
-
- if (fut == null || fut.isDone()) {
- boolean prepared;
-
- try {
- prepared = fut == null ? true : fut.get();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Check prepared transaction future failed: " + e, e);
-
- prepared = false;
- }
-
- if (!prepared) {
- onDone(false);
-
- markInitialized();
-
- return;
- }
- }
- else {
- fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> fut) {
- boolean prepared;
-
- try {
- prepared = fut.get();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Check prepared transaction future failed: " + e, e);
-
- prepared = false;
- }
-
- if (!prepared) {
- onDone(false);
-
- markInitialized();
- }
- else
- proceedPrepare();
- }
- });
-
- return;
- }
- }
-
- proceedPrepare();
- }
-
- /**
- * Process prepare after local check.
- */
- private void proceedPrepare() {
- for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
- UUID nodeId = entry.getKey();
-
- // Skip left nodes and local node.
- if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
- continue;
-
- /*
- * If primary node failed then send message to all backups, otherwise
- * send message only to primary node.
- */
-
- if (nodeId.equals(failedNodeId)) {
- for (UUID id : entry.getValue()) {
- // Skip backup node if it is local node or if it is also was mapped as primary.
- if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
- continue;
-
- MiniFuture fut = new MiniFuture(id);
-
- add(fut);
-
- GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx,
- nodeTransactions(id),
- false,
- futureId(),
- fut.futureId());
-
- try {
- cctx.io().send(id, req, tx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
-
- break;
- }
- }
- }
- else {
- MiniFuture fut = new MiniFuture(nodeId);
-
- add(fut);
-
- GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
- tx,
- nodeTransactions(nodeId),
- false,
- futureId(),
- fut.futureId());
-
- try {
- cctx.io().send(nodeId, req, tx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
-
- break;
- }
- }
- }
-
- markInitialized();
- }
-
- /**
- * @param nodeId Node ID.
- * @return Number of transactions on node.
- */
- private int nodeTransactions(UUID nodeId) {
- int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
-
- for (Collection<UUID> backups : txNodes.values()) {
- for (UUID backup : backups) {
- if (backup.equals(nodeId)) {
- cnt++; // +1 if node is backup.
-
- break;
- }
- }
- }
-
- return cnt;
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- public void onResult(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
- if (!isDone()) {
- for (IgniteInternalFuture<Boolean> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.futureId().equals(res.miniId())) {
- assert f.nodeId().equals(nodeId);
-
- f.onResult(res);
-
- break;
- }
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return tx.xidVersion();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return nodes.values();
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- for (IgniteInternalFuture<?> fut : futures())
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.nodeId().equals(nodeId)) {
- f.onNodeLeft();
-
- return true;
- }
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return trackable;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- trackable = false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
- if (super.onDone(res, err)) {
- cctx.mvcc().removeFuture(this);
-
- if (err == null) {
- assert res != null;
-
- cctx.tm().finishOptimisticTxOnRecovery(tx, res);
- }
- else {
- if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
- if (log.isDebugEnabled())
- log.debug("Failed to check transaction on near node, " +
- "ignoring [err=" + err + ", tx=" + tx + ']');
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Failed to check prepared transactions, " +
- "invalidating transaction [err=" + err + ", tx=" + tx + ']');
-
- cctx.tm().salvageTx(tx);
- }
- }
- }
-
- return false;
- }
-
- /**
- * @param f Future.
- * @return {@code True} if mini-future.
- */
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
- }
-
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheOptimisticCheckPreparedTxFuture.class, this, "super", super.toString());
- }
-
- /**
- *
- */
- private class MiniFuture extends GridFutureAdapter<Boolean> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Mini future ID. */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Node ID. */
- private UUID nodeId;
-
- /**
- * @param nodeId Node ID.
- */
- private MiniFuture(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return Node ID.
- */
- private UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @return Future ID.
- */
- private IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @param e Error.
- */
- private void onError(Throwable e) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- onDone(e);
- }
-
- /**
- */
- private void onNodeLeft() {
- if (log.isDebugEnabled())
- log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
-
- if (nearTxCheck) {
- // Near and originating nodes left, need initiate tx check.
- cctx.tm().commitIfPrepared(tx);
-
- onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
- }
- else
- onDone(true);
- }
-
- /**
- * @param res Result callback.
- */
- private void onResult(GridCacheOptimisticCheckPreparedTxResponse res) {
- onDone(res.success());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
deleted file mode 100644
index 4f2a1d6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Message sent to check that transactions related to transaction were prepared on remote node.
- */
-public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Near transaction ID. */
- private GridCacheVersion nearXidVer;
-
- /** Expected number of transactions on node. */
- private int txNum;
-
- /** System transaction flag. */
- private boolean sys;
-
- /** {@code True} if should check only tx on near node. */
- private boolean nearTxCheck;
-
- /**
- * Empty constructor required by {@link Externalizable}
- */
- public GridCacheOptimisticCheckPreparedTxRequest() {
- // No-op.
- }
-
- /**
- * @param tx Transaction.
- * @param txNum Expected number of transactions on remote node.
- * @param nearTxCheck
- * @param futId Future ID.
- * @param miniId Mini future ID.
- */
- public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx,
- int txNum,
- boolean nearTxCheck,
- IgniteUuid futId,
- IgniteUuid miniId)
- {
- super(tx.xidVersion(), 0);
-
- nearXidVer = tx.nearXidVersion();
- sys = tx.system();
-
- this.futId = futId;
- this.miniId = miniId;
- this.txNum = txNum;
- this.nearTxCheck = nearTxCheck;
- }
-
- /**
- * @return {@code True} if should check only tx on near node.
- */
- public boolean nearTxCheck() {
- return nearTxCheck;
- }
-
- /**
- * @return Near version.
- */
- public GridCacheVersion nearXidVersion() {
- return nearXidVer;
- }
-
- /**
- * @return Future ID.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public IgniteUuid miniId() {
- return miniId;
- }
-
- /**
- * @return Expected number of transactions on node.
- */
- public int transactions() {
- return txNum;
- }
-
- /**
- * @return System transaction flag.
- */
- public boolean system() {
- return sys;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 8:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("nearXidVer", nearXidVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeBoolean("sys", sys))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeInt("txNum", txNum))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 8:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- nearTxCheck = reader.readBoolean("nearTxCheck");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- nearXidVer = reader.readMessage("nearXidVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- sys = reader.readBoolean("sys");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- txNum = reader.readInt("txNum");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 16;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 14;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheOptimisticCheckPreparedTxRequest.class, this, "super", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
deleted file mode 100644
index bc8c2e0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Check prepared transactions response.
- */
-public class GridCacheOptimisticCheckPreparedTxResponse extends GridDistributedBaseMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Flag indicating if all remote transactions were prepared. */
- private boolean success;
-
- /**
- * Empty constructor required by {@link Externalizable}
- */
- public GridCacheOptimisticCheckPreparedTxResponse() {
- // No-op.
- }
-
- /**
- * @param txId Transaction ID.
- * @param futId Future ID.
- * @param miniId Mini future ID.
- * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise.
- */
- public GridCacheOptimisticCheckPreparedTxResponse(GridCacheVersion txId, IgniteUuid futId, IgniteUuid miniId,
- boolean success) {
- super(txId, 0);
-
- this.futId = futId;
- this.miniId = miniId;
- this.success = success;
- }
-
- /**
- * @return Future ID.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public IgniteUuid miniId() {
- return miniId;
- }
-
- /**
- * @return {@code True} if all remote transactions were prepared.
- */
- public boolean success() {
- return success;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 8:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeBoolean("success", success))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 8:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- success = reader.readBoolean("success");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 17;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 11;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheOptimisticCheckPreparedTxResponse.class, this, "super", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
new file mode 100644
index 0000000..663ed90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Future verifying that all remote transactions related to transaction were prepared or committed.
+ */
+public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ private static IgniteLogger log;
+
+ /** Trackable flag. */
+ private boolean trackable = true;
+
+ /** Context. */
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /** Future ID. */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Transaction. */
+ private final IgniteInternalTx tx;
+
+ /** All involved nodes. */
+ private final Map<UUID, ClusterNode> nodes;
+
+ /** ID of failed node started transaction. */
+ private final UUID failedNodeId;
+
+ /** Transaction nodes mapping. */
+ private final Map<UUID, Collection<UUID>> txNodes;
+
+ /** */
+ private final boolean nearTxCheck;
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ * @param failedNodeId ID of failed node started transaction.
+ * @param txNodes Transaction mapping.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx,
+ IgniteInternalTx tx,
+ UUID failedNodeId,
+ Map<UUID, Collection<UUID>> txNodes)
+ {
+ super(cctx.kernalContext(), CU.boolReducer());
+
+ this.cctx = cctx;
+ this.tx = tx;
+ this.txNodes = txNodes;
+ this.failedNodeId = failedNodeId;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class);
+
+ nodes = new GridLeanMap<>();
+
+ UUID locNodeId = cctx.localNodeId();
+
+ for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
+ if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
+ ClusterNode node = cctx.discovery().node(e.getKey());
+
+ if (node != null)
+ nodes.put(node.id(), node);
+ else if (log.isDebugEnabled())
+ log.debug("Transaction node left (will ignore) " + e.getKey());
+ }
+
+ for (UUID nodeId : e.getValue()) {
+ if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ if (node != null)
+ nodes.put(node.id(), node);
+ else if (log.isDebugEnabled())
+ log.debug("Transaction node left (will ignore) " + e.getKey());
+ }
+ }
+ }
+
+ UUID nearNodeId = tx.eventNodeId();
+
+ nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
+ }
+
+ /**
+ * Initializes future.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void prepare() {
+ if (nearTxCheck) {
+ UUID nearNodeId = tx.eventNodeId();
+
+ if (cctx.localNodeId().equals(nearNodeId)) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ try {
+ onDone(fut.get());
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ MiniFuture fut = new MiniFuture(tx.eventNodeId());
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+ tx,
+ 0,
+ true,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nearNodeId, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
+
+ markInitialized();
+ }
+
+ return;
+ }
+
+ // First check transactions on local node.
+ int locTxNum = nodeTransactions(cctx.localNodeId());
+
+ if (locTxNum > 1) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
+
+ if (fut == null || fut.isDone()) {
+ boolean prepared;
+
+ try {
+ prepared = fut == null ? true : fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Check prepared transaction future failed: " + e, e);
+
+ prepared = false;
+ }
+
+ if (!prepared) {
+ onDone(false);
+
+ markInitialized();
+
+ return;
+ }
+ }
+ else {
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ boolean prepared;
+
+ try {
+ prepared = fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Check prepared transaction future failed: " + e, e);
+
+ prepared = false;
+ }
+
+ if (!prepared) {
+ onDone(false);
+
+ markInitialized();
+ }
+ else
+ proceedPrepare();
+ }
+ });
+
+ return;
+ }
+ }
+
+ proceedPrepare();
+ }
+
+ /**
+ * Process prepare after local check.
+ */
+ private void proceedPrepare() {
+ for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
+ UUID nodeId = entry.getKey();
+
+ // Skip left nodes and local node.
+ if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
+ continue;
+
+ /*
+ * If primary node failed then send message to all backups, otherwise
+ * send message only to primary node.
+ */
+
+ if (nodeId.equals(failedNodeId)) {
+ for (UUID id : entry.getValue()) {
+ // Skip backup node if it is local node or if it is also was mapped as primary.
+ if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
+ continue;
+
+ MiniFuture fut = new MiniFuture(id);
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(tx,
+ nodeTransactions(id),
+ false,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(id, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+
+ break;
+ }
+ }
+ }
+ else {
+ MiniFuture fut = new MiniFuture(nodeId);
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+ tx,
+ nodeTransactions(nodeId),
+ false,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nodeId, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+
+ break;
+ }
+ }
+ }
+
+ markInitialized();
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return Number of transactions on node.
+ */
+ private int nodeTransactions(UUID nodeId) {
+ int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
+
+ for (Collection<UUID> backups : txNodes.values()) {
+ for (UUID backup : backups) {
+ if (backup.equals(nodeId)) {
+ cnt++; // +1 if node is backup.
+
+ break;
+ }
+ }
+ }
+
+ return cnt;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) {
+ if (!isDone()) {
+ for (IgniteInternalFuture<Boolean> fut : pending()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.nodeId().equals(nodeId);
+
+ f.onResult(res);
+
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return tx.xidVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ return nodes.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ for (IgniteInternalFuture<?> fut : futures())
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.nodeId().equals(nodeId)) {
+ f.onNodeLeft();
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return trackable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ trackable = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ cctx.mvcc().removeFuture(this);
+
+ if (err == null) {
+ assert res != null;
+
+ cctx.tm().finishTxOnRecovery(tx, res);
+ }
+ else {
+ if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check transaction on near node, " +
+ "ignoring [err=" + err + ", tx=" + tx + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check prepared transactions, " +
+ "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+
+ cctx.tm().salvageTx(tx);
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param f Future.
+ * @return {@code True} if mini-future.
+ */
+ private boolean isMini(IgniteInternalFuture<?> f) {
+ return f.getClass().equals(MiniFuture.class);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryFuture.class, this, "super", super.toString());
+ }
+
+ /**
+ *
+ */
+ private class MiniFuture extends GridFutureAdapter<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Mini future ID. */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Node ID. */
+ private UUID nodeId;
+
+ /**
+ * @param nodeId Node ID.
+ */
+ private MiniFuture(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ private UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ private IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @param e Error.
+ */
+ private void onError(Throwable e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+ onDone(e);
+ }
+
+ /**
+ */
+ private void onNodeLeft() {
+ if (log.isDebugEnabled())
+ log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
+
+ if (nearTxCheck) {
+ // Near and originating nodes left, need initiate tx check.
+ cctx.tm().commitIfPrepared(tx);
+
+ onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+ }
+ else
+ onDone(true);
+ }
+
+ /**
+ * @param res Result callback.
+ */
+ private void onResult(GridCacheTxRecoveryResponse res) {
+ onDone(res.success());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
new file mode 100644
index 0000000..259c288
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Message sent to check that transactions related to transaction were prepared on remote node.
+ */
+public class GridCacheTxRecoveryRequest extends GridDistributedBaseMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Near transaction ID. */
+ private GridCacheVersion nearXidVer;
+
+ /** Expected number of transactions on node. */
+ private int txNum;
+
+ /** System transaction flag. */
+ private boolean sys;
+
+ /** {@code True} if should check only tx on near node. */
+ private boolean nearTxCheck;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCacheTxRecoveryRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param tx Transaction.
+ * @param txNum Expected number of transactions on remote node.
+ * @param nearTxCheck {@code True} if should check only tx on near node.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ */
+ public GridCacheTxRecoveryRequest(IgniteInternalTx tx,
+ int txNum,
+ boolean nearTxCheck,
+ IgniteUuid futId,
+ IgniteUuid miniId)
+ {
+ super(tx.xidVersion(), 0);
+
+ nearXidVer = tx.nearXidVersion();
+ sys = tx.system();
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.txNum = txNum;
+ this.nearTxCheck = nearTxCheck;
+ }
+
+ /**
+ * @return {@code True} if should check only tx on near node.
+ */
+ public boolean nearTxCheck() {
+ return nearTxCheck;
+ }
+
+ /**
+ * @return Near version.
+ */
+ public GridCacheVersion nearXidVersion() {
+ return nearXidVer;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return Expected number of transactions on node.
+ */
+ public int transactions() {
+ return txNum;
+ }
+
+ /**
+ * @return System transaction flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 8:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeIgniteUuid("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeBoolean("sys", sys))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeInt("txNum", txNum))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 8:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ miniId = reader.readIgniteUuid("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ nearTxCheck = reader.readBoolean("nearTxCheck");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ nearXidVer = reader.readMessage("nearXidVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ sys = reader.readBoolean("sys");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ txNum = reader.readInt("txNum");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 16;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 14;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryRequest.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
new file mode 100644
index 0000000..e5c026a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Transactions recovery check response.
+ */
+public class GridCacheTxRecoveryResponse extends GridDistributedBaseMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Flag indicating if all remote transactions were prepared. */
+ private boolean success;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCacheTxRecoveryResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param txId Transaction ID.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise.
+ */
+ public GridCacheTxRecoveryResponse(GridCacheVersion txId,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ boolean success)
+ {
+ super(txId, 0);
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.success = success;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return {@code True} if all remote transactions were prepared.
+ */
+ public boolean success() {
+ return success;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 8:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeIgniteUuid("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeBoolean("success", success))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 8:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ miniId = reader.readIgniteUuid("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ success = reader.readBoolean("success");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 17;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 11;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryResponse.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 2897e30..af75fb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -119,16 +119,16 @@ public class IgniteTxHandler {
}
});
- ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class,
- new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest>() {
- @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest req) {
+ ctx.io().addHandler(0, GridCacheTxRecoveryRequest.class,
+ new CI2<UUID, GridCacheTxRecoveryRequest>() {
+ @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) {
processCheckPreparedTxRequest(nodeId, req);
}
});
- ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class,
- new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse>() {
- @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
+ ctx.io().addHandler(0, GridCacheTxRecoveryResponse.class,
+ new CI2<UUID, GridCacheTxRecoveryResponse>() {
+ @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) {
processCheckPreparedTxResponse(nodeId, res);
}
});
@@ -138,6 +138,7 @@ public class IgniteTxHandler {
* @param nearNodeId Near node ID that initiated transaction.
* @param locTx Optional local transaction.
* @param req Near prepare request.
+ * @param completeCb Completion callback.
* @return Future for transaction.
*/
public IgniteInternalFuture<IgniteInternalTx> prepareTx(
@@ -170,6 +171,7 @@ public class IgniteTxHandler {
*
* @param locTx Local transaction.
* @param req Near prepare request.
+ * @param completeCb Completion callback.
* @return Prepare future.
*/
private IgniteInternalFuture<IgniteInternalTx> prepareColocatedTx(
@@ -177,7 +179,6 @@ public class IgniteTxHandler {
final GridNearTxPrepareRequest req,
final IgniteInClosure<GridNearTxPrepareResponse> completeCb
) {
-
IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys.
return new GridEmbeddedFuture<>(
@@ -223,6 +224,7 @@ public class IgniteTxHandler {
*
* @param nearNodeId Near node ID that initiated transaction.
* @param req Near prepare request.
+ * @param completeCb Completion callback.
* @return Prepare future.
*/
private IgniteInternalFuture<IgniteInternalTx> prepareNearTx(
@@ -442,6 +444,7 @@ public class IgniteTxHandler {
/**
* @param nodeId Node ID.
+ * @param locTx Local transaction.
* @param req Request.
* @return Future.
*/
@@ -1099,6 +1102,7 @@ public class IgniteTxHandler {
}
/**
+ * @param cacheCtx Context.
* @param key Key
* @param ver Version.
* @throws IgniteCheckedException If invalidate failed.
@@ -1183,7 +1187,7 @@ public class IgniteTxHandler {
* @param req Request.
*/
protected void processCheckPreparedTxRequest(final UUID nodeId,
- final GridCacheOptimisticCheckPreparedTxRequest req)
+ final GridCacheTxRecoveryRequest req)
{
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
@@ -1231,10 +1235,10 @@ public class IgniteTxHandler {
* @param prepared {@code True} if all transaction prepared or committed.
*/
private void sendCheckPreparedResponse(UUID nodeId,
- GridCacheOptimisticCheckPreparedTxRequest req,
+ GridCacheTxRecoveryRequest req,
boolean prepared) {
- GridCacheOptimisticCheckPreparedTxResponse res =
- new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared);
+ GridCacheTxRecoveryResponse res =
+ new GridCacheTxRecoveryResponse(req.version(), req.futureId(), req.miniId(), prepared);
try {
if (log.isDebugEnabled())
@@ -1256,11 +1260,11 @@ public class IgniteTxHandler {
* @param nodeId Node ID.
* @param res Response.
*/
- protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
+ protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) {
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
- GridCacheOptimisticCheckPreparedTxFuture fut = (GridCacheOptimisticCheckPreparedTxFuture)ctx.mvcc().
+ GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().
<Boolean>future(res.version(), res.futureId());
if (fut == null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 85b3ad0..8a1d490 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1931,40 +1931,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Gets local transaction for pessimistic tx recovery.
- *
- * @param nearXidVer Near tx ID.
- * @return Near local or colocated local transaction.
- */
- @Nullable public IgniteInternalTx localTxForRecovery(GridCacheVersion nearXidVer, boolean markFinalizing) {
- // First check if we have near transaction with this ID.
- IgniteInternalTx tx = idMap.get(nearXidVer);
-
- if (tx == null) {
- // Check all local transactions and mark them as waiting for recovery to prevent finish race.
- for (IgniteInternalTx txEx : idMap.values()) {
- if (nearXidVer.equals(txEx.nearXidVersion())) {
- if (!markFinalizing || !txEx.markFinalizing(RECOVERY_WAIT))
- tx = txEx;
- }
- }
- }
-
- // Either we found near transaction or one of transactions is being committed by user.
- // Wait for it and send reply.
- if (tx != null && tx.local())
- return tx;
-
- return null;
- }
-
- /**
* Commits or rolls back prepared transaction.
*
* @param tx Transaction.
* @param commit Whether transaction should be committed or rolled back.
*/
- public void finishOptimisticTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
+ public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
if (log.isDebugEnabled())
log.debug("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']');
@@ -1989,71 +1961,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Commits or rolls back pessimistic transaction.
- *
- * @param tx Transaction to finish.
- * @param commitInfo Commit information.
- */
- public void finishPessimisticTxOnRecovery(final IgniteInternalTx tx, GridCacheCommittedTxInfo commitInfo) {
- if (!tx.markFinalizing(RECOVERY_FINISH)) {
- if (log.isDebugEnabled())
- log.debug("Will not try to finish pessimistic transaction (could not mark as finalizing): " + tx);
-
- return;
- }
-
- if (tx instanceof GridDistributedTxRemoteAdapter) {
- IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
-
- rmtTx.doneRemote(tx.xidVersion(),
- Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList());
- }
-
- try {
- tx.prepare();
-
- if (commitInfo != null) {
- for (IgniteTxEntry entry : commitInfo.recoveryWrites()) {
- IgniteTxEntry write = tx.writeMap().get(entry.txKey());
-
- if (write != null) {
- GridCacheEntryEx cached = write.cached();
-
- IgniteTxEntry recovered = entry.cleanCopy(write.context());
-
- if (cached == null || cached.detached())
- cached = write.context().cache().entryEx(entry.key(), tx.topologyVersion());
-
- recovered.cached(cached);
-
- tx.writeMap().put(entry.txKey(), recovered);
-
- continue;
- }
-
- // If write was not found, check read.
- IgniteTxEntry read = tx.readMap().remove(entry.txKey());
-
- if (read != null)
- tx.writeMap().put(entry.txKey(), entry);
- }
-
- tx.commitAsync().listen(new CommitListener(tx));
- }
- else
- tx.rollbackAsync();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to prepare pessimistic transaction (will invalidate): " + tx, e);
-
- salvageTx(tx);
- }
- }
-
- /**
- * Commits optimistic transaction in case when node started transaction failed, but all related
+ * Commits transaction in case when node started transaction failed, but all related
* transactions were prepared (invalidates transaction if it is not fully prepared).
*
* @param tx Transaction.
@@ -2063,7 +1971,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert !F.isEmpty(tx.transactionNodes()) : tx;
assert tx.nearXidVersion() != null : tx;
- GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
+ GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
cctx,
tx,
tx.originatingNodeId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 35495ed..657f4af 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -455,9 +455,9 @@ org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresMa
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$RemoveSetDataCallable
org.apache.ignite.internal.processors.cache.distributed.GridCacheCommittedTxInfo
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxFuture$1
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxRequest
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxResponse
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture$1
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse
org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest
org.apache.ignite.internal.processors.cache.distributed.GridDistributedBaseMessage
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter
[14/18] incubator-ignite git commit: #gg-10186: NullPointerException
at CacheDrStateTransferHandler.java:320 #gg-10187: NullPointerException at
GridEntSecurityProcessor.java:263
Posted by sb...@apache.org.
#gg-10186: NullPointerException at CacheDrStateTransferHandler.java:320
#gg-10187: NullPointerException at GridEntSecurityProcessor.java:263
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/587103fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/587103fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/587103fd
Branch: refs/heads/ignite-841
Commit: 587103fdd1273e1d98897a07f98594dac85e38bc
Parents: 99c7e22
Author: ivasilinets <iv...@gridgain.com>
Authored: Wed May 6 12:40:27 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Wed May 6 12:40:27 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 109 ++++++++++--------
.../processors/cache/CacheGetFromJobTest.java | 110 +++++++++++++++++++
.../testsuites/IgniteCacheRestartTestSuite.java | 1 +
3 files changed, 174 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/587103fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c0026ab..d22d224 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -124,6 +124,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Must use JDK marshaller since it is used by discovery to fire custom events. */
private Marshaller marshaller = new JdkMarshaller();
+ /** Count down latch for caches. */
+ private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
+
/**
* @param ctx Kernal context.
*/
@@ -657,87 +660,92 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStart() throws IgniteCheckedException {
- if (ctx.config().isDaemon())
- return;
+ try {
+ if (ctx.config().isDaemon())
+ return;
- ClusterNode locNode = ctx.discovery().localNode();
+ ClusterNode locNode = ctx.discovery().localNode();
- // Init cache plugin managers.
- final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
+ // Init cache plugin managers.
+ final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- CacheConfiguration locCcfg = desc.cacheConfiguration();
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheConfiguration locCcfg = desc.cacheConfiguration();
- CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
+ CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
- cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
- }
+ cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
+ }
- if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
- for (ClusterNode n : ctx.discovery().remoteNodes()) {
- checkTransactionConfiguration(n);
+ if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+ for (ClusterNode n : ctx.discovery().remoteNodes()) {
+ checkTransactionConfiguration(n);
- DeploymentMode locDepMode = ctx.config().getDeploymentMode();
- DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+ DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+ DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
- CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
- locDepMode, rmtDepMode, true);
+ CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+ locDepMode, rmtDepMode, true);
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
- if (rmtCfg != null) {
- CacheConfiguration locCfg = desc.cacheConfiguration();
+ if (rmtCfg != null) {
+ CacheConfiguration locCfg = desc.cacheConfiguration();
- checkCache(locCfg, rmtCfg, n);
+ checkCache(locCfg, rmtCfg, n);
- // Check plugin cache configurations.
- CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
+ // Check plugin cache configurations.
+ CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
- assert pluginMgr != null : " Map=" + cache2PluginMgr;
+ assert pluginMgr != null : " Map=" + cache2PluginMgr;
- pluginMgr.validateRemotes(rmtCfg, n);
+ pluginMgr.validateRemotes(rmtCfg, n);
+ }
}
}
}
- }
- // Start dynamic caches received from collect discovery data.
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- boolean started = desc.onStart();
+ // Start dynamic caches received from collect discovery data.
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ boolean started = desc.onStart();
- assert started : "Failed to change started flag for locally configured cache: " + desc;
+ assert started : "Failed to change started flag for locally configured cache: " + desc;
- desc.clearRemoteConfigurations();
+ desc.clearRemoteConfigurations();
- CacheConfiguration ccfg = desc.cacheConfiguration();
+ CacheConfiguration ccfg = desc.cacheConfiguration();
- IgnitePredicate filter = ccfg.getNodeFilter();
+ IgnitePredicate filter = ccfg.getNodeFilter();
- if (filter.apply(locNode)) {
- CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+ if (filter.apply(locNode)) {
+ CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
- CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
+ CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
- assert pluginMgr != null : " Map=" + cache2PluginMgr;
+ assert pluginMgr != null : " Map=" + cache2PluginMgr;
- GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
+ GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
- ctx.dynamicDeploymentId(desc.deploymentId());
+ ctx.dynamicDeploymentId(desc.deploymentId());
- sharedCtx.addCacheContext(ctx);
+ sharedCtx.addCacheContext(ctx);
- GridCacheAdapter cache = ctx.cache();
+ GridCacheAdapter cache = ctx.cache();
- String name = ccfg.getName();
+ String name = ccfg.getName();
- caches.put(maskNull(name), cache);
+ caches.put(maskNull(name), cache);
- startCache(cache);
+ startCache(cache);
- jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+ jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+ }
}
}
+ finally {
+ cacheStartedLatch.countDown();
+ }
ctx.marshallerContext().onMarshallerCacheStarted(ctx);
@@ -835,6 +843,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStop(boolean cancel) {
+ cacheStartedLatch.countDown();
+
if (ctx.config().isDaemon())
return;
@@ -959,6 +969,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @throws IgniteCheckedException If failed to wait.
+ */
+ public void awaitStarted() throws IgniteCheckedException {
+ U.await(cacheStartedLatch);
+ }
+
+ /**
* @param cache Cache.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/587103fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
new file mode 100644
index 0000000..5859bec
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.atomic.*;
+
+/**
+ * Job tries to get cache during topology change.
+ */
+public class CacheGetFromJobTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTopologyChange() throws Exception {
+ final AtomicReference<Exception> err = new AtomicReference<>();
+
+ final AtomicInteger id = new AtomicInteger(1);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ info("Run topology change.");
+
+ try {
+ for (int i = 0; i < 5; i++) {
+ info("Topology change: " + i);
+
+ startGrid(id.getAndIncrement());
+ }
+ }
+ catch (Exception e) {
+ err.set(e);
+
+ log.error("Unexpected exception in topology-change-thread: " + e, e);
+ }
+ }
+ }, 3, "topology-change-thread");
+
+ int cntr = 0;
+
+ while (!fut.isDone()) {
+ grid(0).compute().broadcast(new TestJob());
+
+ cntr++;
+ }
+
+ log.info("Job execution count: " + cntr);
+
+ Exception err0 = err.get();
+
+ if (err0 != null)
+ throw err0;
+ }
+
+ /**
+ * Test job.
+ */
+ private static class TestJob implements IgniteCallable<Object> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ public TestJob() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ IgniteCache cache = ignite.cache(null);
+
+ assertNotNull(cache);
+
+ assertEquals(0, cache.localSize());
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/587103fd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index 0ced1c8..796d138 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -48,6 +48,7 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheAtomicPutAllFailoverSelfTest.class);
suite.addTestSuite(IgniteCachePutAllRestartTest.class);
suite.addTestSuite(GridCachePutAllFailoverSelfTest.class);
+ suite.addTestSuite(CacheGetFromJobTest.class);
return suite;
}
[17/18] incubator-ignite git commit: ignite-862: Fixed.
Posted by sb...@apache.org.
ignite-862: Fixed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d4908f24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d4908f24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d4908f24
Branch: refs/heads/ignite-841
Commit: d4908f2449a4fde9298f6ca11590e0a94a94c955
Parents: 7be25bd
Author: Artem Shutak <as...@gridgain.com>
Authored: Wed May 6 12:58:08 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Wed May 6 12:58:08 2015 +0300
----------------------------------------------------------------------
.../cache/DynamicCacheDescriptor.java | 16 ++++++++++-
.../processors/cache/GridCacheProcessor.java | 30 ++++++--------------
2 files changed, 24 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d4908f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index d8da9ef..6f6f422 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.plugin.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -56,13 +58,17 @@ public class DynamicCacheDescriptor {
/** Template configuration flag. */
private boolean template;
+ /** Cache plugin manager. */
+ private final CachePluginManager pluginMgr;
+
/**
* @param cacheCfg Cache configuration.
* @param cacheType Cache type.
* @param template {@code True} if this is template configuration.
* @param deploymentId Deployment ID.
*/
- public DynamicCacheDescriptor(CacheConfiguration cacheCfg,
+ public DynamicCacheDescriptor(GridKernalContext ctx,
+ CacheConfiguration cacheCfg,
CacheType cacheType,
boolean template,
IgniteUuid deploymentId) {
@@ -70,6 +76,7 @@ public class DynamicCacheDescriptor {
this.cacheType = cacheType;
this.template = template;
this.deploymentId = deploymentId;
+ pluginMgr = new CachePluginManager(ctx, cacheCfg);
}
/**
@@ -149,6 +156,13 @@ public class DynamicCacheDescriptor {
}
/**
+ * @return Cache plugin manager.
+ */
+ public CachePluginManager pluginManager() {
+ return pluginMgr;
+ }
+
+ /**
* Sets cancelled flag.
*/
public void onCancelled() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d4908f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d22d224..2b9a821 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -610,7 +610,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cfg, cacheType, template, IgniteUuid.randomUuid());
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, template,
+ IgniteUuid.randomUuid());
desc.locallyConfigured(true);
desc.staticallyConfigured(true);
@@ -638,7 +639,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cfg.getName() == null) { // Use cache configuration with null name as template.
DynamicCacheDescriptor desc0 =
- new DynamicCacheDescriptor(cfg, cacheType, true, IgniteUuid.randomUuid());
+ new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid());
desc0.locallyConfigured(true);
desc0.staticallyConfigured(true);
@@ -666,17 +667,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ClusterNode locNode = ctx.discovery().localNode();
- // Init cache plugin managers.
- final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
-
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- CacheConfiguration locCcfg = desc.cacheConfiguration();
-
- CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
-
- cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
- }
-
if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
for (ClusterNode n : ctx.discovery().remoteNodes()) {
checkTransactionConfiguration(n);
@@ -696,9 +686,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
checkCache(locCfg, rmtCfg, n);
// Check plugin cache configurations.
- CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
-
- assert pluginMgr != null : " Map=" + cache2PluginMgr;
+ CachePluginManager pluginMgr = desc.pluginManager();
pluginMgr.validateRemotes(rmtCfg, n);
}
@@ -721,9 +709,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (filter.apply(locNode)) {
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
- CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
-
- assert pluginMgr != null : " Map=" + cache2PluginMgr;
+ CachePluginManager pluginMgr = desc.pluginManager();
GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
@@ -1657,6 +1643,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (existing == null) {
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
ccfg,
req.cacheType(),
true,
@@ -1690,6 +1677,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
assert req.cacheType() != null : req;
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
ccfg,
req.cacheType(),
false,
@@ -2039,7 +2027,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (desc == null) {
DynamicCacheDescriptor templateDesc =
- new DynamicCacheDescriptor(ccfg, req.cacheType(), true, req.deploymentId());
+ new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, req.deploymentId());
DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc);
@@ -2093,7 +2081,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
assert req.cacheType() != null : req;
DynamicCacheDescriptor startDesc =
- new DynamicCacheDescriptor(ccfg, req.cacheType(), false, req.deploymentId());
+ new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);