You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/18 18:12:21 UTC

[32/46] incubator-ignite git commit: # Properly handle ClusterTopologyServerNotFoundException for retries

# Properly handle ClusterTopologyServerNotFoundException for retries


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2903a29e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2903a29e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2903a29e

Branch: refs/heads/ignite-843
Commit: 2903a29e7a50802617872bfd0fcc3497c4c7785e
Parents: 122a9db
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 14 16:22:25 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 14 16:22:25 2015 +0300

----------------------------------------------------------------------
 .../CachePartialUpdateCheckedException.java     | 29 +++++++++++-
 .../processors/cache/GridCacheAdapter.java      | 50 ++++++++++++--------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 48 +++++++++++--------
 .../near/GridNearOptimisticTxPrepareFuture.java |  2 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  1 -
 5 files changed, 86 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
index 0272b7c..f430d12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.affinity.*;
 
 import java.util.*;
 
@@ -32,6 +33,9 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
     /** Failed keys. */
     private final Collection<Object> failedKeys = new ArrayList<>();
 
+    /** */
+    private AffinityTopologyVersion topVer;
+
     /**
      * @param msg Error message.
      */
@@ -50,13 +54,36 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
     /**
      * @param failedKeys Failed keys.
      * @param err Error.
+     * @param topVer Topology version for failed update.
      */
-    public void add(Collection<?> failedKeys, Throwable err) {
+    public void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
+        if (topVer != null) {
+            AffinityTopologyVersion topVer0 = this.topVer;
+
+            if (topVer0 == null || topVer.compareTo(topVer0) > 0)
+                this.topVer = topVer;
+        }
+
         this.failedKeys.addAll(failedKeys);
 
         addSuppressed(err);
     }
 
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @param failedKeys Failed keys.
+     * @param err Error.
+     */
+    public void add(Collection<?> failedKeys, Throwable err) {
+        add(failedKeys, err, null);
+    }
+
     /** {@inheritDoc} */
     @Override public String getMessage() {
         return super.getMessage() + ": " + failedKeys;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 91af352..992edd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3975,13 +3975,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     }
 
                     if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) {
-                        AffinityTopologyVersion topVer = tx.topologyVersion();
+                        ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
 
-                        assert topVer != null && topVer.topologyVersion() > 0 : tx;
+                        if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                            AffinityTopologyVersion topVer = tx.topologyVersion();
+
+                            assert topVer != null && topVer.topologyVersion() > 0 : tx;
 
-                        ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get();
+                            ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get();
 
-                        continue;
+                            continue;
+                        }
                     }
 
                     throw e;
@@ -4702,31 +4706,35 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     }
                     catch (IgniteCheckedException e) {
                         if (X.hasCause(e, ClusterTopologyCheckedException.class) && --retries > 0) {
-                            IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx;
+                            ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
 
-                            assert tx != null;
+                            if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                                IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx;
 
-                            AffinityTopologyVersion topVer = tx.topologyVersion();
+                                assert tx != null;
 
-                            assert topVer != null && topVer.topologyVersion() > 0 : tx;
+                                AffinityTopologyVersion topVer = tx.topologyVersion();
 
-                            IgniteInternalFuture<?> topFut =
-                                ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1);
+                                assert topVer != null && topVer.topologyVersion() > 0 : tx;
 
-                            topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
-                                @Override public void apply(IgniteInternalFuture<?> topFut) {
-                                    try {
-                                        topFut.get();
+                                IgniteInternalFuture<?> topFut =
+                                    ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1);
 
-                                        execute();
-                                    }
-                                    catch (IgniteCheckedException e) {
-                                        onDone(e);
+                                topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                    @Override public void apply(IgniteInternalFuture<?> topFut) {
+                                        try {
+                                            topFut.get();
+
+                                            execute();
+                                        }
+                                        catch (IgniteCheckedException e) {
+                                            onDone(e);
+                                        }
                                     }
-                                }
-                            });
+                                });
 
-                            return;
+                                return;
+                            }
                         }
 
                         onDone(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 0498839..5dc5494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -275,6 +275,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             if (singleNodeId.equals(nodeId)) {
                 onDone(addFailedKeys(
                     singleReq.keys(),
+                    singleReq.topologyVersion(),
                     new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)));
 
                 return true;
@@ -286,8 +287,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         GridNearAtomicUpdateRequest req = mappings.get(nodeId);
 
         if (req != null) {
-            addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " +
-                "received: " + nodeId));
+            addFailedKeys(req.keys(),
+                req.topologyVersion(),
+                new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId));
 
             mappings.remove(nodeId);
 
@@ -356,8 +358,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /**
      * @param failed Keys to remap.
+     * @param errTopVer Topology version for failed update.
      */
-    private void remap(Collection<?> failed) {
+    private void remap(Collection<?> failed, AffinityTopologyVersion errTopVer) {
+        assert errTopVer != null;
+
         GridCacheVersion futVer0 = futVer;
 
         if (futVer0 == null || cctx.mvcc().removeAtomicFuture(futVer0) == null)
@@ -409,15 +414,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
         GridFutureAdapter<Void> fut0;
 
-        long nextTopVer;
-
         synchronized (this) {
             mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
 
             assert topVer != null && topVer.topologyVersion() > 0 : this;
 
-            nextTopVer = topVer.topologyVersion() + 1;
-
             topVer = AffinityTopologyVersion.ZERO;
 
             fut0 = topCompleteFut;
@@ -434,7 +435,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         updVer = null;
         topLocked = false;
 
-        IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(nextTopVer);
+        IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1);
 
         fut.listen(new CI1<IgniteInternalFuture<?>>() {
             @Override public void apply(final IgniteInternalFuture<?> fut) {
@@ -471,15 +472,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             X.hasCause(err, ClusterTopologyCheckedException.class) &&
             storeFuture() &&
             remapCnt.decrementAndGet() > 0) {
+            ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
 
-            CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class);
+            if (!(topErr instanceof  ClusterTopologyServerNotFoundException)) {
+                CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class);
 
-            if (F.isEmpty(cause.failedKeys()))
-                cause.printStackTrace();
+                assert cause != null && cause.topologyVersion() != null : err;
 
-            remap(cause.failedKeys());
+                remap(cause.failedKeys(), cause.topologyVersion());
 
-            return false;
+                return false;
+            }
         }
 
         if (super.onDone(retval, err)) {
@@ -528,8 +531,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
             updateNear(singleReq, res);
 
-            if (res.error() != null)
-                onDone(res.failedKeys() != null ? addFailedKeys(res.failedKeys(), res.error()) : res.error());
+            if (res.error() != null) {
+                onDone(res.failedKeys() != null ?
+                    addFailedKeys(res.failedKeys(), singleReq.topologyVersion(), res.error()) : res.error());
+            }
             else {
                 if (op == TRANSFORM) {
                     if (ret != null)
@@ -551,7 +556,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 updateNear(req, res);
 
                 if (res.error() != null)
-                    addFailedKeys(req.keys(), res.error());
+                    addFailedKeys(req.keys(), req.topologyVersion(), res.error());
                 else {
                     if (op == TRANSFORM) {
                         assert !req.fastMap();
@@ -1048,7 +1053,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     onDone(new GridCacheReturn(cctx, true, null, true));
             }
             catch (IgniteCheckedException e) {
-                onDone(addFailedKeys(req.keys(), e));
+                onDone(addFailedKeys(req.keys(), req.topologyVersion(), e));
             }
         }
     }
@@ -1079,7 +1084,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {
-                    addFailedKeys(req.keys(), e);
+                    addFailedKeys(req.keys(), req.topologyVersion(), e);
 
                     removeMapping(req.nodeId());
                 }
@@ -1135,10 +1140,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /**
      * @param failedKeys Failed keys.
+     * @param topVer Topology version for failed update.
      * @param err Error cause.
      * @return Root {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException}.
      */
-    private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys, Throwable err) {
+    private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys,
+        AffinityTopologyVersion topVer,
+        Throwable err) {
         CachePartialUpdateCheckedException err0 = this.err;
 
         if (err0 == null)
@@ -1149,7 +1157,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         for (KeyCacheObject key : failedKeys)
             keys.add(key.value(cctx.cacheObjectContext(), false));
 
-        err0.add(keys, err);
+        err0.add(keys, err, topVer);
 
         return err0;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 44b7997..2b86672 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -416,7 +416,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                 GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
                 if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
-                    onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " +
+                    onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " +
                         "partition nodes left the grid): " + cacheCtx.name()));
 
                     return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 0a8f87c..ff948a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -5010,7 +5010,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         assertThrows(log, new Callable<Object>() {
             @Override public Object call() throws Exception {
-
                 IgniteFuture fut = cache.future().chain(new IgniteClosure<IgniteFuture, Object>() {
                     @Override public Object apply(IgniteFuture o) {
                         return o.get();