You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/17 13:07:53 UTC
[06/12] incubator-ignite git commit: # Wait for next topology version
before retry, retries for async tx operations
# Wait for next topology version before retry, retries for async tx operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/122a9dbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/122a9dbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/122a9dbf
Branch: refs/heads/ignite-426
Commit: 122a9dbf337d5c1128be32d4efee1e0f1dc683f5
Parents: 47895da
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 14 12:50:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 14 13:57:19 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 143 +++++++++++++++++--
.../dht/atomic/GridNearAtomicUpdateFuture.java | 25 +++-
.../IgniteCachePutRetryAbstractSelfTest.java | 94 ++++++++++--
3 files changed, 237 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/122a9dbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 47ede5b..91af352 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2283,8 +2283,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param filter Filter.
* @return Put future.
*/
- public IgniteInternalFuture<Boolean> putAsync(K key, V val,
- @Nullable CacheEntryPredicate... filter) {
+ public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate... filter) {
final boolean statsEnabled = ctx.config().isStatisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -3975,8 +3974,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
U.addLastCause(e, e1, log);
}
- if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1)
+ if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) {
+ AffinityTopologyVersion topVer = tx.topologyVersion();
+
+ assert topVer != null && topVer.topologyVersion() > 0 : tx;
+
+ ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get();
+
continue;
+ }
throw e;
}
@@ -4014,18 +4020,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
- if (tx == null || tx.implicit())
- tx = ctx.tm().newTx(
- true,
- op.single(),
- ctx.systemTx() ? ctx : null,
- OPTIMISTIC,
- READ_COMMITTED,
- ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),
- !ctx.skipStore(),
- 0);
+ if (tx == null || tx.implicit()) {
+ boolean skipStore = ctx.skipStore(); // Save value of thread-local flag.
+
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES;
- return asyncOp(tx, op);
+ if (retries == 1) {
+ tx = ctx.tm().newTx(
+ true,
+ op.single(),
+ ctx.systemTx() ? ctx : null,
+ OPTIMISTIC,
+ READ_COMMITTED,
+ ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),
+ !skipStore,
+ 0);
+
+ return asyncOp(tx, op);
+ }
+ else {
+ AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, skipStore, retries);
+
+ fut.execute();
+
+ return fut;
+ }
+ }
+ else
+ return asyncOp(tx, op);
}
/**
@@ -4624,6 +4648,97 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
*
*/
+ private class AsyncOpRetryFuture<T> extends GridFutureAdapter<T> {
+ /** */
+ private AsyncOp<T> op;
+
+ /** */
+ private boolean skipStore;
+
+ /** */
+ private int retries;
+
+ /** */
+ private IgniteTxLocalAdapter tx;
+
+ /**
+ * @param op Operation.
+ * @param skipStore Skip store flag.
+ * @param retries Number of retries.
+ */
+ public AsyncOpRetryFuture(AsyncOp<T> op,
+ boolean skipStore,
+ int retries) {
+ assert retries > 1 : retries;
+
+ this.op = op;
+ this.tx = null;
+ this.skipStore = skipStore;
+ this.retries = retries;
+ }
+
+ /**
+ * @param tx Transaction.
+ */
+ public void execute() {
+ tx = ctx.tm().newTx(
+ true,
+ op.single(),
+ ctx.systemTx() ? ctx : null,
+ OPTIMISTIC,
+ READ_COMMITTED,
+ ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),
+ !skipStore,
+ 0);
+
+ IgniteInternalFuture<T> fut = asyncOp(tx, op);
+
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() {
+ @Override public void apply(IgniteInternalFuture<T> fut) {
+ try {
+ T res = fut.get();
+
+ onDone(res);
+ }
+ catch (IgniteCheckedException e) {
+ if (X.hasCause(e, ClusterTopologyCheckedException.class) && --retries > 0) {
+ IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx;
+
+ assert tx != null;
+
+ AffinityTopologyVersion topVer = tx.topologyVersion();
+
+ assert topVer != null && topVer.topologyVersion() > 0 : tx;
+
+ IgniteInternalFuture<?> topFut =
+ ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1);
+
+ topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> topFut) {
+ try {
+ topFut.get();
+
+ execute();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+
+ return;
+ }
+
+ onDone(e);
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
private static class PeekModes {
/** */
private boolean near;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/122a9dbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 751c9ba..0498839 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -409,9 +409,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
GridFutureAdapter<Void> fut0;
+ long nextTopVer;
+
synchronized (this) {
mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
+ assert topVer != null && topVer.topologyVersion() > 0 : this;
+
+ nextTopVer = topVer.topologyVersion() + 1;
+
topVer = AffinityTopologyVersion.ZERO;
fut0 = topCompleteFut;
@@ -428,7 +434,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
updVer = null;
topLocked = false;
- map();
+ IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(nextTopVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(final IgniteInternalFuture<?> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ fut.get();
+
+ map();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ });
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/122a9dbf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index dcba325..9abc5c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.testframework.*;
+import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -76,11 +77,25 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
protected CacheAtomicWriteOrderMode writeOrderMode() {
return CLOCK;
}
-
/**
* @throws Exception If failed.
*/
public void testPut() throws Exception {
+ checkPut(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAsync() throws Exception {
+ checkPut(true);
+ }
+
+ /**
+ * @param async If {@code true} tests asynchronous put.
+ * @throws Exception If failed.
+ */
+ private void checkPut(boolean async) throws Exception {
final AtomicBoolean finished = new AtomicBoolean();
IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@@ -104,20 +119,67 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
if (atomicityMode() == ATOMIC)
assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
- for (int i = 0; i < keysCnt; i++)
- cache.put(i, i);
+ int iter = 0;
+
+ long stopTime = System.currentTimeMillis() + 60_000;
+
+ if (async) {
+ IgniteCache<Object, Object> cache0 = cache.withAsync();
+
+ while (System.currentTimeMillis() < stopTime) {
+ Integer val = ++iter;
+
+ for (int i = 0; i < keysCnt; i++) {
+ cache0.put(i, val);
+
+ cache0.future().get();
+ }
+
+ for (int i = 0; i < keysCnt; i++) {
+ cache0.get(i);
+
+ assertEquals(val, cache0.future().get());
+ }
+ }
+ }
+ else {
+ while (System.currentTimeMillis() < stopTime) {
+ Integer val = ++iter;
+
+ for (int i = 0; i < keysCnt; i++)
+ cache.put(i, val);
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(val, cache.get(i));
+ }
+ }
finished.set(true);
fut.get();
for (int i = 0; i < keysCnt; i++)
- assertEquals(i, ignite(0).cache(null).get(i));
+ assertEquals(iter, cache.get(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailsWithNoRetries() throws Exception {
+ checkFailsWithNoRetries(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailsWithNoRetriesAsync() throws Exception {
+ checkFailsWithNoRetries(true);
}
/**
+ * @param async If {@code true} tests asynchronous put.
* @throws Exception If failed.
*/
- public void testFailWithNoRetries() throws Exception {
+ private void checkFailsWithNoRetries(boolean async) throws Exception {
final AtomicBoolean finished = new AtomicBoolean();
IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@@ -136,22 +198,34 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
int keysCnt = keysCount();
- boolean exceptionThrown = false;
+ boolean eThrown = false;
+
+ IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries();
+
+ if (async)
+ cache = cache.withAsync();
for (int i = 0; i < keysCnt; i++) {
try {
- ignite(0).cache(null).withNoRetries().put(i, i);
+ if (async) {
+ cache.put(i, i);
+
+ cache.future().get();
+ }
+ else
+ cache.put(i, i);
}
catch (Exception e) {
- assertTrue("Invalid exception: " + e, X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, CachePartialUpdateException.class));
+ assertTrue("Invalid exception: " + e,
+ X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class));
- exceptionThrown = true;
+ eThrown = true;
break;
}
}
- assertTrue(exceptionThrown);
+ assertTrue(eThrown);
finished.set(true);
fut.get();