You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 10:02:11 UTC
ignite git commit: ignite-971 Fix retry for 'checkBackup' tx failure.
Repository: ignite
Updated Branches:
refs/heads/ignite-971 cef56589e -> 1c57db9a5
ignite-971 Fix retry for 'checkBackup' tx failure.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c57db9a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c57db9a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c57db9a
Branch: refs/heads/ignite-971
Commit: 1c57db9a5f81f08c03ad10208a15c2cb352099d6
Parents: cef5658
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 10:49:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 11:01:14 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 21 ++++-----
.../dht/GridDhtPartitionTopologyImpl.java | 9 ++--
.../near/GridNearOptimisticTxPrepareFuture.java | 3 ++
.../near/GridNearTxFinishFuture.java | 32 +++++++++++---
.../cache/transactions/IgniteTxHandler.java | 9 +++-
.../IgniteCachePutRetryAbstractSelfTest.java | 46 ++++++++++++++------
.../dht/IgniteCachePutRetryAtomicSelfTest.java | 2 +-
...gniteCachePutRetryTransactionalSelfTest.java | 16 +++++--
8 files changed, 100 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9329e94..1fc94ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4096,21 +4096,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return t;
}
- catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
- IgniteTxRollbackCheckedException e) {
+ catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
- try {
- tx.rollback();
+ if (!(e instanceof IgniteTxRollbackCheckedException)) {
+ try {
+ tx.rollback();
- e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
- tx.xid(), e);
- }
- catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
- U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
+ e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
+ tx.xid(), e);
+ }
+ catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
+ U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
- U.addLastCause(e, e1, log);
+ U.addLastCause(e, e1, log);
+ }
}
if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5e183e9..fcb012f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -546,10 +546,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @param updateSeq Update sequence.
* @return Local partition.
*/
- private GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) {
- while (true) {
- boolean belongs = cctx.affinity().localNode(p, topVer);
+ private GridDhtLocalPartition localPartition(int p,
+ AffinityTopologyVersion topVer,
+ boolean create,
+ boolean updateSeq) {
+ boolean belongs = create && cctx.affinity().localNode(p, topVer);
+ while (true) {
GridDhtLocalPartition loc = locParts.get(p);
if (loc != null && loc.state() == EVICTED) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 3f9decf..2048fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -234,6 +234,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* Completeness callback.
+ *
+ * @return {@code True} if future was finished by this call.
*/
private boolean onComplete() {
Throwable err0 = err.get();
@@ -457,6 +459,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* @param reads Read entries.
* @param writes Write entries.
+ * @throws IgniteCheckedException If failed.
*/
private void prepare(
Iterable<IgniteTxEntry> reads,
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 21aaef2..ab6dc3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -404,8 +404,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (backup == null) {
readyNearMappingFromBackup(mapping);
+ ClusterTopologyCheckedException cause =
+ new ClusterTopologyCheckedException("Backup node left grid: " + backupId);
+
+ cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
- "(backup has left grid): " + tx.xidVersion()));
+ "(backup has left grid): " + tx.xidVersion(), cause));
}
else if (backup.isLocal()) {
boolean committed = cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion());
@@ -414,9 +419,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (committed)
mini.onDone(tx);
- else
+ else {
+ ClusterTopologyCheckedException cause =
+ new ClusterTopologyCheckedException("Primary node left grid: " + nodeId);
+
+ cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
- "(transaction has been rolled back on backup node): " + tx.xidVersion()));
+ "(transaction has been rolled back on backup node): " + tx.xidVersion(), cause));
+ }
}
else {
GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
@@ -731,8 +742,19 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
readyNearMappingFromBackup(m);
- if (res.checkCommittedError() != null)
- onDone(res.checkCommittedError());
+ Throwable err = res.checkCommittedError();
+
+ if (err != null) {
+ if (err instanceof IgniteCheckedException) {
+ ClusterTopologyCheckedException cause =
+ ((IgniteCheckedException)err).getCause(ClusterTopologyCheckedException.class);
+
+ if (cause != null)
+ cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+ }
+
+ onDone(err);
+ }
else
onDone(tx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 9efa43a..756672a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1049,6 +1049,7 @@ public class IgniteTxHandler {
*
* @param nodeId Node id that originated finish request.
* @param req Request.
+ * @param {@code True} if transaction committed on this node.
*/
protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
if (req.replyRequired()) {
@@ -1057,9 +1058,13 @@ public class IgniteTxHandler {
if (req.checkCommitted()) {
res.checkCommitted(true);
- if (!committed)
+ if (!committed) {
+ ClusterTopologyCheckedException cause =
+ new ClusterTopologyCheckedException("Primary node left grid.");
+
res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
- "(transaction has been rolled back on backup node): " + req.version()));
+ "(transaction has been rolled back on backup node): " + req.version(), cause));
+ }
}
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 966e75a..c20e231 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -81,10 +81,12 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
/**
* @param memMode Memory mode.
+ * @param store If {@code true} adds cache store.
* @return Cache configuration.
* @throws Exception If failed.
*/
- protected CacheConfiguration cacheConfiguration(TestMemoryMode memMode) throws Exception {
+ @SuppressWarnings("unchecked")
+ protected CacheConfiguration cacheConfiguration(TestMemoryMode memMode, boolean store) throws Exception {
CacheConfiguration cfg = new CacheConfiguration();
cfg.setAtomicityMode(atomicityMode());
@@ -92,8 +94,11 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
cfg.setAtomicWriteOrderMode(writeOrderMode());
cfg.setBackups(1);
cfg.setRebalanceMode(SYNC);
- cfg.setCacheStoreFactory(new TestStoreFactory());
- cfg.setWriteThrough(true);
+
+ if (store) {
+ cfg.setCacheStoreFactory(new TestStoreFactory());
+ cfg.setWriteThrough(true);
+ }
GridTestUtils.setMemoryMode(null, cfg, memMode, 100, 1024);
@@ -154,58 +159,73 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
* @throws Exception If failed.
*/
public void testPut() throws Exception {
- checkRetry(Test.PUT, TestMemoryMode.HEAP);
+ checkRetry(Test.PUT, TestMemoryMode.HEAP, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutStoreEnabled() throws Exception {
+ checkRetry(Test.PUT, TestMemoryMode.HEAP, true);
}
/**
* @throws Exception If failed.
*/
public void testPutAll() throws Exception {
- checkRetry(Test.PUT_ALL, TestMemoryMode.HEAP);
+ checkRetry(Test.PUT_ALL, TestMemoryMode.HEAP, false);
}
/**
* @throws Exception If failed.
*/
public void testPutAsync() throws Exception {
- checkRetry(Test.PUT_ASYNC, TestMemoryMode.HEAP);
+ checkRetry(Test.PUT_ASYNC, TestMemoryMode.HEAP, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAsyncStoreEnabled() throws Exception {
+ checkRetry(Test.PUT_ASYNC, TestMemoryMode.HEAP, true);
}
/**
* @throws Exception If failed.
*/
public void testInvoke() throws Exception {
- checkRetry(Test.INVOKE, TestMemoryMode.HEAP);
+ checkRetry(Test.INVOKE, TestMemoryMode.HEAP, false);
}
/**
* @throws Exception If failed.
*/
public void testInvokeAll() throws Exception {
- checkRetry(Test.INVOKE_ALL, TestMemoryMode.HEAP);
+ checkRetry(Test.INVOKE_ALL, TestMemoryMode.HEAP, false);
}
/**
* @throws Exception If failed.
*/
public void testInvokeAllOffheapSwap() throws Exception {
- checkRetry(Test.INVOKE_ALL, TestMemoryMode.OFFHEAP_EVICT_SWAP);
+ checkRetry(Test.INVOKE_ALL, TestMemoryMode.OFFHEAP_EVICT_SWAP, false);
}
/**
* @throws Exception If failed.
*/
public void testInvokeAllOffheapTiered() throws Exception {
- checkRetry(Test.INVOKE_ALL, TestMemoryMode.OFFHEAP_TIERED);
+ checkRetry(Test.INVOKE_ALL, TestMemoryMode.OFFHEAP_TIERED, false);
}
/**
* @param test Test type.
* @param memMode Memory mode.
+ * @param store If {@code true} uses cache with store.
* @throws Exception If failed.
*/
- private void checkRetry(Test test, TestMemoryMode memMode) throws Exception {
- ignite(0).createCache(cacheConfiguration(memMode));
+ private void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
+ ignite(0).createCache(cacheConfiguration(memMode, store));
final AtomicBoolean finished = new AtomicBoolean();
@@ -393,7 +413,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
* @throws Exception If failed.
*/
private void checkFailsWithNoRetries(boolean async) throws Exception {
- ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP));
+ ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false));
final AtomicBoolean finished = new AtomicBoolean();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
index 17d0af1..3d7c7d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
@@ -47,7 +47,7 @@ public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstra
* @throws Exception If failed.
*/
public void testPutInsideTransaction() throws Exception {
- ignite(0).createCache(cacheConfiguration(GridTestUtils.TestMemoryMode.HEAP));
+ ignite(0).createCache(cacheConfiguration(GridTestUtils.TestMemoryMode.HEAP, false));
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c57db9a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index fecb16b..f61faf2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -92,23 +92,31 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
* @throws Exception If failed.
*/
public void testExplicitTransactionRetries() throws Exception {
- explicitTransactionRetries(TestMemoryMode.HEAP);
+ explicitTransactionRetries(TestMemoryMode.HEAP, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExplicitTransactionRetriesStoreEnabled() throws Exception {
+ explicitTransactionRetries(TestMemoryMode.HEAP, true);
}
/**
* @throws Exception If failed.
*/
public void testExplicitTransactionRetriesOffheapSwap() throws Exception {
- explicitTransactionRetries(TestMemoryMode.OFFHEAP_EVICT_SWAP);
+ explicitTransactionRetries(TestMemoryMode.OFFHEAP_EVICT_SWAP, false);
}
/**
* @param memMode Memory mode.
+ * @param store If {@code true} uses cache with store.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- public void explicitTransactionRetries(TestMemoryMode memMode) throws Exception {
- ignite(0).createCache(cacheConfiguration(memMode));
+ public void explicitTransactionRetries(TestMemoryMode memMode, boolean store) throws Exception {
+ ignite(0).createCache(cacheConfiguration(memMode, store));
final AtomicInteger idx = new AtomicInteger();
int threads = 8;