You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/21 10:13:20 UTC

[14/34] incubator-ignite git commit: IGNITE-1265 - Added ready future to ClusterTopologyException, added test for correct explicit transaction retries.

IGNITE-1265 - Added ready future to ClusterTopologyException, added test for correct explicit transaction retries.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8f1c1c0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8f1c1c0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8f1c1c0d

Branch: refs/heads/master
Commit: 8f1c1c0d7406f9b43681a6da69560c8ba6bbb737
Parents: 8ced207
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Aug 19 22:55:48 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Aug 19 22:55:48 2015 -0700

----------------------------------------------------------------------
 .../cluster/ClusterTopologyException.java       |  18 ++
 .../ClusterTopologyCheckedException.java        |  18 ++
 .../cache/GridCacheSharedContext.java           |  15 ++
 .../colocated/GridDhtColocatedLockFuture.java   |   8 +-
 .../distributed/near/GridNearLockFuture.java    |   8 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   7 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   7 +-
 .../ignite/internal/util/IgniteUtils.java       |   9 +-
 .../IgniteCachePutRetryAbstractSelfTest.java    |   1 +
 ...gniteCachePutRetryTransactionalSelfTest.java | 179 +++++++++++++++++++
 10 files changed, 263 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
index d28c409..61bc367 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cluster;
 
 import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 /**
@@ -27,6 +28,9 @@ public class ClusterTopologyException extends IgniteException {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Retry ready future. */
+    private transient IgniteFuture<?> readyFut;
+
     /**
      * Creates new topology exception with given error message.
      *
@@ -46,4 +50,18 @@ public class ClusterTopologyException extends IgniteException {
     public ClusterTopologyException(String msg, @Nullable Throwable cause) {
         super(msg, cause);
     }
+
+    /**
+     * @return Retry ready future.
+     */
+    public IgniteFuture<?> retryReadyFuture() {
+        return readyFut;
+    }
+
+    /**
+     * @param readyFut Retry ready future.
+     */
+    public void retryReadyFuture(IgniteFuture<?> readyFut) {
+        this.readyFut = readyFut;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
index 8f985b4..2d7b0de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.cluster;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
 import org.jetbrains.annotations.*;
 
 /**
@@ -27,6 +28,9 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Next topology version to wait. */
+    private transient IgniteInternalFuture<?> readyFut;
+
     /**
      * Creates new topology exception with given error message.
      *
@@ -46,4 +50,18 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException {
     public ClusterTopologyCheckedException(String msg, @Nullable Throwable cause) {
         super(msg, cause);
     }
+
+    /**
+     * @return Retry ready future.
+     */
+    public IgniteInternalFuture<?> retryReadyFuture() {
+        return readyFut;
+    }
+
+    /**
+     * @param readyFut Retry ready future.
+     */
+    public void retryReadyFuture(IgniteInternalFuture<?> readyFut) {
+        this.readyFut = readyFut;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 7f4daff..f7763ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -444,6 +444,21 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * Gets ready future for the next affinity topology version (used in cases when a node leaves grid).
+     *
+     * @param curVer Current topology version (before a node left grid).
+     * @return Ready future.
+     */
+    public IgniteInternalFuture<?> nextAffinityReadyFuture(AffinityTopologyVersion curVer) {
+        if (curVer == null)
+            return null;
+
+        AffinityTopologyVersion nextVer = new AffinityTopologyVersion(curVer.topologyVersion() + 1);
+
+        return exchMgr.affinityReadyFuture(nextVer);
+    }
+
+    /**
      * @param tx Transaction to check.
      * @param activeCacheIds Active cache IDs.
      * @param cacheCtx Cache context.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index c784948..90ca8df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1125,8 +1125,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
      * @return Topology exception with user-friendly message.
      */
     private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) {
-        return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " +
-            "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+        ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
+            "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+
+        topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+
+        return topEx;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index b7e0d73..2815194 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1232,8 +1232,12 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      * @return Topology exception with user-friendly message.
      */
     private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) {
-        return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " +
-            "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+        ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
+            "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+
+        topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+
+        return topEx;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2b86672..4bb4c67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -100,7 +100,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                 MiniFuture f = (MiniFuture) fut;
 
                 if (f.node().id().equals(nodeId)) {
-                    f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+                    ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+                        nodeId);
+
+                    e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+                    f.onResult(e);
 
                     found = true;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 7006114..3d43797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -72,7 +72,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             MiniFuture f = (MiniFuture)fut;
 
             if (f.node().id().equals(nodeId)) {
-                f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+                ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+                    nodeId);
+
+                e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+                f.onNodeLeft(e);
 
                 found = true;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3366256..e259084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.mxbean.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.transactions.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.io.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -580,7 +581,13 @@ public abstract class IgniteUtils {
 
         m.put(ClusterTopologyCheckedException.class, new C1<IgniteCheckedException, IgniteException>() {
             @Override public IgniteException apply(IgniteCheckedException e) {
-                return new ClusterTopologyException(e.getMessage(), e);
+                ClusterTopologyException topEx = new ClusterTopologyException(e.getMessage(), e);
+
+                ClusterTopologyCheckedException checked = (ClusterTopologyCheckedException)e;
+
+                topEx.retryReadyFuture(new IgniteFutureImpl<>(checked.retryReadyFuture()));
+
+                return topEx;
             }
         });
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 9abc5c8..6624f8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -54,6 +54,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
 
         cfg.setAtomicWriteOrderMode(writeOrderMode());
         cfg.setBackups(1);
+        cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
 
         return cfg;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 91c454a..9a6bb31 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -19,17 +19,29 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
 
+import javax.cache.*;
+import javax.cache.processor.*;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
 /**
  *
  */
 public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetryAbstractSelfTest {
+    /** */
+    private static final int FACTOR = 1000;
+
     /** {@inheritDoc} */
     @Override protected CacheAtomicityMode atomicityMode() {
         return CacheAtomicityMode.TRANSACTIONAL;
@@ -71,4 +83,171 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
         finished.set(true);
         fut.get();
     }
+
+    /** {@inheritDoc} */
+    public void testExplicitTransactionRetries() throws Exception {
+        final AtomicInteger idx = new AtomicInteger();
+        int threads = 8;
+
+        final AtomicReferenceArray<Exception> err = new AtomicReferenceArray<>(threads);
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                int th = idx.getAndIncrement();
+                int base = th * FACTOR;
+
+                Ignite ignite = ignite(0);
+                final IgniteCache<Object, Object> cache = ignite.cache(null);
+
+                try {
+                    for (int i = 0; i < FACTOR; i++) {
+                        doInTransaction(ignite, new ProcessCallable(cache, base, i));
+
+                        if (i > 0 && i % 500 == 0)
+                            info("Done: " + i);
+                    }
+                }
+                catch (Exception e) {
+                    err.set(th, e);
+                }
+
+                return null;
+            }
+        }, threads, "tx-runner");
+
+        while (!fut.isDone()) {
+            int stopIdx = ThreadLocalRandom.current().nextInt(2, 4); // Random in [2, 3].
+
+            stopGrid(stopIdx);
+
+            startGrid(stopIdx);
+        }
+
+        for (int i = 0; i < threads; i++) {
+            Exception error = err.get(i);
+
+            if (error != null)
+                throw error;
+        }
+
+        // Verify contents of the cache.
+        for (int g = 0; g < gridCount(); g++) {
+            IgniteCache<Object, Object> cache = ignite(g).cache(null);
+
+            for (int th = 0; th < threads; th++) {
+                int base = th * FACTOR;
+
+                String key = "key-" + base;
+
+                Set<String> set = (Set<String>)cache.get(key);
+
+                assertNotNull("Missing set for key: " + key, set);
+                assertEquals(FACTOR, set.size());
+
+                for (int i = 0; i < FACTOR; i++) {
+                    assertEquals("value-" + i, cache.get("key-" + base + "-" + i));
+                    assertTrue(set.contains("value-" + i));
+                }
+            }
+        }
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param clo Closure.
+     * @return Result of closure execution.
+     * @throws Exception
+     */
+    private <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+        while (true) {
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                T res = clo.call();
+
+                tx.commit();
+
+                return res;
+            }
+            catch (CacheException e) {
+                if (e.getCause() instanceof ClusterTopologyException) {
+                    ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+                    topEx.retryReadyFuture().get();
+                }
+                else
+                    throw e;
+            }
+            catch (ClusterTopologyException e) {
+                IgniteFuture<?> fut = e.retryReadyFuture();
+
+                fut.get();
+            }
+            catch (TransactionRollbackException ignore) {
+                // Safe to retry right away.
+            }
+        }
+    }
+
+    /**
+     * Callable to process inside transaction.
+     */
+    private static class ProcessCallable implements Callable<Void> {
+        /** */
+        private IgniteCache cache;
+
+        /** */
+        private int base;
+
+        /** */
+        private int i;
+
+        /**
+         * @param cache Cache.
+         * @param base Base index.
+         * @param i Iteration index.
+         */
+        private ProcessCallable(IgniteCache<Object, Object> cache, int base, int i) {
+            this.cache = cache;
+            this.base = base;
+            this.i = i;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            ((IgniteCache<String, String>)cache).put("key-" + base + "-" + i, "value-" + i);
+
+            ((IgniteCache<String, Set<String>>)cache).invoke("key-" + base, new AddEntryProcessor("value-" + i));
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class AddEntryProcessor implements CacheEntryProcessor<String, Set<String>, Void> {
+        /** */
+        private String addVal;
+
+        /**
+         * @param addVal Value to add.
+         */
+        private AddEntryProcessor(String addVal) {
+            this.addVal = addVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<String, Set<String>> entry, Object... arguments) throws EntryProcessorException {
+            Set<String> set = entry.getValue();
+
+            if (set == null)
+                set = new HashSet<>();
+
+            set.add(addVal);
+
+            entry.setValue(set);
+
+            return null;
+        }
+    }
 }