You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/17 13:05:29 UTC
[08/15] incubator-ignite git commit: # Properly handle
ClusterTopologyServerNotFoundException for retries
# Properly handle ClusterTopologyServerNotFoundException for retries
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2903a29e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2903a29e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2903a29e
Branch: refs/heads/ignite-gg-10606
Commit: 2903a29e7a50802617872bfd0fcc3497c4c7785e
Parents: 122a9db
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 14 16:22:25 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 14 16:22:25 2015 +0300
----------------------------------------------------------------------
.../CachePartialUpdateCheckedException.java | 29 +++++++++++-
.../processors/cache/GridCacheAdapter.java | 50 ++++++++++++--------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 48 +++++++++++--------
.../near/GridNearOptimisticTxPrepareFuture.java | 2 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 1 -
5 files changed, 86 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
index 0272b7c..f430d12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.affinity.*;
import java.util.*;
@@ -32,6 +33,9 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
/** Failed keys. */
private final Collection<Object> failedKeys = new ArrayList<>();
+ /** */
+ private AffinityTopologyVersion topVer;
+
/**
* @param msg Error message.
*/
@@ -50,13 +54,36 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
/**
* @param failedKeys Failed keys.
* @param err Error.
+ * @param topVer Topology version for failed update.
*/
- public void add(Collection<?> failedKeys, Throwable err) {
+ public void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
+ if (topVer != null) {
+ AffinityTopologyVersion topVer0 = this.topVer;
+
+ if (topVer0 == null || topVer.compareTo(topVer0) > 0)
+ this.topVer = topVer;
+ }
+
this.failedKeys.addAll(failedKeys);
addSuppressed(err);
}
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @param failedKeys Failed keys.
+ * @param err Error.
+ */
+ public void add(Collection<?> failedKeys, Throwable err) {
+ add(failedKeys, err, null);
+ }
+
/** {@inheritDoc} */
@Override public String getMessage() {
return super.getMessage() + ": " + failedKeys;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 91af352..992edd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3975,13 +3975,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) {
- AffinityTopologyVersion topVer = tx.topologyVersion();
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
- assert topVer != null && topVer.topologyVersion() > 0 : tx;
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ AffinityTopologyVersion topVer = tx.topologyVersion();
+
+ assert topVer != null && topVer.topologyVersion() > 0 : tx;
- ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get();
+ ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get();
- continue;
+ continue;
+ }
}
throw e;
@@ -4702,31 +4706,35 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, ClusterTopologyCheckedException.class) && --retries > 0) {
- IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx;
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
- assert tx != null;
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx;
- AffinityTopologyVersion topVer = tx.topologyVersion();
+ assert tx != null;
- assert topVer != null && topVer.topologyVersion() > 0 : tx;
+ AffinityTopologyVersion topVer = tx.topologyVersion();
- IgniteInternalFuture<?> topFut =
- ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1);
+ assert topVer != null && topVer.topologyVersion() > 0 : tx;
- topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> topFut) {
- try {
- topFut.get();
+ IgniteInternalFuture<?> topFut =
+ ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1);
- execute();
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> topFut) {
+ try {
+ topFut.get();
+
+ execute();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
}
- }
- });
+ });
- return;
+ return;
+ }
}
onDone(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 0498839..5dc5494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -275,6 +275,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (singleNodeId.equals(nodeId)) {
onDone(addFailedKeys(
singleReq.keys(),
+ singleReq.topologyVersion(),
new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)));
return true;
@@ -286,8 +287,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
GridNearAtomicUpdateRequest req = mappings.get(nodeId);
if (req != null) {
- addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " +
- "received: " + nodeId));
+ addFailedKeys(req.keys(),
+ req.topologyVersion(),
+ new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId));
mappings.remove(nodeId);
@@ -356,8 +358,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/**
* @param failed Keys to remap.
+ * @param errTopVer Topology version for failed update.
*/
- private void remap(Collection<?> failed) {
+ private void remap(Collection<?> failed, AffinityTopologyVersion errTopVer) {
+ assert errTopVer != null;
+
GridCacheVersion futVer0 = futVer;
if (futVer0 == null || cctx.mvcc().removeAtomicFuture(futVer0) == null)
@@ -409,15 +414,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
GridFutureAdapter<Void> fut0;
- long nextTopVer;
-
synchronized (this) {
mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
assert topVer != null && topVer.topologyVersion() > 0 : this;
- nextTopVer = topVer.topologyVersion() + 1;
-
topVer = AffinityTopologyVersion.ZERO;
fut0 = topCompleteFut;
@@ -434,7 +435,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
updVer = null;
topLocked = false;
- IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(nextTopVer);
+ IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1);
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(final IgniteInternalFuture<?> fut) {
@@ -471,15 +472,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
X.hasCause(err, ClusterTopologyCheckedException.class) &&
storeFuture() &&
remapCnt.decrementAndGet() > 0) {
+ ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
- CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class);
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class);
- if (F.isEmpty(cause.failedKeys()))
- cause.printStackTrace();
+ assert cause != null && cause.topologyVersion() != null : err;
- remap(cause.failedKeys());
+ remap(cause.failedKeys(), cause.topologyVersion());
- return false;
+ return false;
+ }
}
if (super.onDone(retval, err)) {
@@ -528,8 +531,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
updateNear(singleReq, res);
- if (res.error() != null)
- onDone(res.failedKeys() != null ? addFailedKeys(res.failedKeys(), res.error()) : res.error());
+ if (res.error() != null) {
+ onDone(res.failedKeys() != null ?
+ addFailedKeys(res.failedKeys(), singleReq.topologyVersion(), res.error()) : res.error());
+ }
else {
if (op == TRANSFORM) {
if (ret != null)
@@ -551,7 +556,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
updateNear(req, res);
if (res.error() != null)
- addFailedKeys(req.keys(), res.error());
+ addFailedKeys(req.keys(), req.topologyVersion(), res.error());
else {
if (op == TRANSFORM) {
assert !req.fastMap();
@@ -1048,7 +1053,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
onDone(new GridCacheReturn(cctx, true, null, true));
}
catch (IgniteCheckedException e) {
- onDone(addFailedKeys(req.keys(), e));
+ onDone(addFailedKeys(req.keys(), req.topologyVersion(), e));
}
}
}
@@ -1079,7 +1084,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
- addFailedKeys(req.keys(), e);
+ addFailedKeys(req.keys(), req.topologyVersion(), e);
removeMapping(req.nodeId());
}
@@ -1135,10 +1140,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/**
* @param failedKeys Failed keys.
+ * @param topVer Topology version for failed update.
* @param err Error cause.
* @return Root {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException}.
*/
- private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys, Throwable err) {
+ private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys,
+ AffinityTopologyVersion topVer,
+ Throwable err) {
CachePartialUpdateCheckedException err0 = this.err;
if (err0 == null)
@@ -1149,7 +1157,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
for (KeyCacheObject key : failedKeys)
keys.add(key.value(cctx.cacheObjectContext(), false));
- err0.add(keys, err);
+ err0.add(keys, err, topVer);
return err0;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 44b7997..2b86672 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -416,7 +416,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
- onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " +
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " +
"partition nodes left the grid): " + cacheCtx.name()));
return;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 0a8f87c..ff948a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -5010,7 +5010,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
-
IgniteFuture fut = cache.future().chain(new IgniteClosure<IgniteFuture, Object>() {
@Override public Object apply(IgniteFuture o) {
return o.get();