You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/17 13:07:53 UTC

[06/12] incubator-ignite git commit: # Wait for next topology version before retry, retries for async tx operations

# Wait for next topology version before retry, retries for async tx operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/122a9dbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/122a9dbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/122a9dbf

Branch: refs/heads/ignite-426
Commit: 122a9dbf337d5c1128be32d4efee1e0f1dc683f5
Parents: 47895da
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 14 12:50:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 14 13:57:19 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 143 +++++++++++++++++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  25 +++-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  94 ++++++++++--
 3 files changed, 237 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/122a9dbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 47ede5b..91af352 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2283,8 +2283,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param filter Filter.
      * @return Put future.
      */
-    public IgniteInternalFuture<Boolean> putAsync(K key, V val,
-                                                  @Nullable CacheEntryPredicate... filter) {
+    public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate... filter) {
         final boolean statsEnabled = ctx.config().isStatisticsEnabled();
 
         final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -3975,8 +3974,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         U.addLastCause(e, e1, log);
                     }
 
-                    if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1)
+                    if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) {
+                        AffinityTopologyVersion topVer = tx.topologyVersion();
+
+                        assert topVer != null && topVer.topologyVersion() > 0 : tx;
+
+                        ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get();
+
                         continue;
+                    }
 
                     throw e;
                 }
@@ -4014,18 +4020,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
 
-        if (tx == null || tx.implicit())
-            tx = ctx.tm().newTx(
-                true,
-                op.single(),
-                ctx.systemTx() ? ctx : null,
-                OPTIMISTIC,
-                READ_COMMITTED,
-                ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),
-                !ctx.skipStore(),
-                0);
+        if (tx == null || tx.implicit()) {
+            boolean skipStore = ctx.skipStore(); // Save value of thread-local flag.
+
+            CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+            int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES;
 
-        return asyncOp(tx, op);
+            if (retries == 1) {
+                tx = ctx.tm().newTx(
+                    true,
+                    op.single(),
+                    ctx.systemTx() ? ctx : null,
+                    OPTIMISTIC,
+                    READ_COMMITTED,
+                    ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),
+                    !skipStore,
+                    0);
+
+                return asyncOp(tx, op);
+            }
+            else {
+                AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, skipStore, retries);
+
+                fut.execute();
+
+                return fut;
+            }
+        }
+        else
+            return asyncOp(tx, op);
     }
 
     /**
@@ -4624,6 +4648,97 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      *
      */
+    private class AsyncOpRetryFuture<T> extends GridFutureAdapter<T> {
+        /** */
+        private AsyncOp<T> op;
+
+        /** */
+        private boolean skipStore;
+
+        /** */
+        private int retries;
+
+        /** */
+        private IgniteTxLocalAdapter tx;
+
+        /**
+         * @param op Operation.
+         * @param skipStore Skip store flag.
+         * @param retries Number of retries.
+         */
+        public AsyncOpRetryFuture(AsyncOp<T> op,
+            boolean skipStore,
+            int retries) {
+            assert retries > 1 : retries;
+
+            this.op = op;
+            this.tx = null;
+            this.skipStore = skipStore;
+            this.retries = retries;
+        }
+
+        /**
+         * @param tx Transaction.
+         */
+        public void execute() {
+            tx = ctx.tm().newTx(
+                true,
+                op.single(),
+                ctx.systemTx() ? ctx : null,
+                OPTIMISTIC,
+                READ_COMMITTED,
+                ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),
+                !skipStore,
+                0);
+
+            IgniteInternalFuture<T> fut = asyncOp(tx, op);
+
+            fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() {
+                @Override public void apply(IgniteInternalFuture<T> fut) {
+                    try {
+                        T res = fut.get();
+
+                        onDone(res);
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (X.hasCause(e, ClusterTopologyCheckedException.class) && --retries > 0) {
+                            IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx;
+
+                            assert tx != null;
+
+                            AffinityTopologyVersion topVer = tx.topologyVersion();
+
+                            assert topVer != null && topVer.topologyVersion() > 0 : tx;
+
+                            IgniteInternalFuture<?> topFut =
+                                ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1);
+
+                            topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> topFut) {
+                                    try {
+                                        topFut.get();
+
+                                        execute();
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        onDone(e);
+                                    }
+                                }
+                            });
+
+                            return;
+                        }
+
+                        onDone(e);
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     *
+     */
     private static class PeekModes {
         /** */
         private boolean near;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/122a9dbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 751c9ba..0498839 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -409,9 +409,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
         GridFutureAdapter<Void> fut0;
 
+        long nextTopVer;
+
         synchronized (this) {
             mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
 
+            assert topVer != null && topVer.topologyVersion() > 0 : this;
+
+            nextTopVer = topVer.topologyVersion() + 1;
+
             topVer = AffinityTopologyVersion.ZERO;
 
             fut0 = topCompleteFut;
@@ -428,7 +434,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         updVer = null;
         topLocked = false;
 
-        map();
+        IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(nextTopVer);
+
+        fut.listen(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(final IgniteInternalFuture<?> fut) {
+                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            fut.get();
+
+                            map();
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
+                    }
+                });
+            }
+        });
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/122a9dbf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index dcba325..9abc5c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.testframework.*;
 
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -76,11 +77,25 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
     protected CacheAtomicWriteOrderMode writeOrderMode() {
         return CLOCK;
     }
-
     /**
      * @throws Exception If failed.
      */
     public void testPut() throws Exception {
+        checkPut(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAsync() throws Exception {
+        checkPut(true);
+    }
+
+    /**
+     * @param async If {@code true} tests asynchronous put.
+     * @throws Exception If failed.
+     */
+    private void checkPut(boolean async) throws Exception {
         final AtomicBoolean finished = new AtomicBoolean();
 
         IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@@ -104,20 +119,67 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
         if (atomicityMode() == ATOMIC)
             assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
 
-        for (int i = 0; i < keysCnt; i++)
-            cache.put(i, i);
+        int iter = 0;
+
+        long stopTime = System.currentTimeMillis() + 60_000;
+
+        if (async) {
+            IgniteCache<Object, Object> cache0 = cache.withAsync();
+
+            while (System.currentTimeMillis() < stopTime) {
+                Integer val = ++iter;
+
+                for (int i = 0; i < keysCnt; i++) {
+                    cache0.put(i, val);
+
+                    cache0.future().get();
+                }
+
+                for (int i = 0; i < keysCnt; i++) {
+                    cache0.get(i);
+
+                    assertEquals(val, cache0.future().get());
+                }
+            }
+        }
+        else {
+            while (System.currentTimeMillis() < stopTime) {
+                Integer val = ++iter;
+
+                for (int i = 0; i < keysCnt; i++)
+                    cache.put(i, val);
+
+                for (int i = 0; i < keysCnt; i++)
+                    assertEquals(val, cache.get(i));
+            }
+        }
 
         finished.set(true);
         fut.get();
 
         for (int i = 0; i < keysCnt; i++)
-            assertEquals(i, ignite(0).cache(null).get(i));
+            assertEquals(iter, cache.get(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailsWithNoRetries() throws Exception {
+        checkFailsWithNoRetries(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailsWithNoRetriesAsync() throws Exception {
+        checkFailsWithNoRetries(true);
     }
 
     /**
+     * @param async If {@code true} tests asynchronous put.
      * @throws Exception If failed.
      */
-    public void testFailWithNoRetries() throws Exception {
+    private void checkFailsWithNoRetries(boolean async) throws Exception {
         final AtomicBoolean finished = new AtomicBoolean();
 
         IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@@ -136,22 +198,34 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
 
         int keysCnt = keysCount();
 
-        boolean exceptionThrown = false;
+        boolean eThrown = false;
+
+        IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries();
+
+        if (async)
+            cache = cache.withAsync();
 
         for (int i = 0; i < keysCnt; i++) {
             try {
-                ignite(0).cache(null).withNoRetries().put(i, i);
+                if (async) {
+                    cache.put(i, i);
+
+                    cache.future().get();
+                }
+                else
+                    cache.put(i, i);
             }
             catch (Exception e) {
-                assertTrue("Invalid exception: " + e, X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, CachePartialUpdateException.class));
+                assertTrue("Invalid exception: " + e,
+                    X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class));
 
-                exceptionThrown = true;
+                eThrown = true;
 
                 break;
             }
         }
 
-        assertTrue(exceptionThrown);
+        assertTrue(eThrown);
 
         finished.set(true);
         fut.get();