You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/30 13:11:32 UTC
[2/2] ignite git commit: IGNITE-10352 Fix cache get request can be
mapped to node while partition in MOVING state - Fixes #5519.
IGNITE-10352 Fix cache get request can be mapped to node while partition in MOVING state - Fixes #5519.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18aaee03
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18aaee03
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18aaee03
Branch: refs/heads/master
Commit: 18aaee039908821d9d962a7bf3659578164e375d
Parents: 74d342d
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Fri Nov 30 16:10:56 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Fri Nov 30 16:10:56 2018 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheContext.java | 31 +-
.../dht/CacheDistributedGetFutureAdapter.java | 433 +++++++++++++-
.../distributed/dht/GridDhtCacheAdapter.java | 22 +-
.../distributed/dht/GridDhtGetSingleFuture.java | 2 +-
.../dht/GridPartitionedGetFuture.java | 571 ++++++-------------
.../dht/GridPartitionedSingleGetFuture.java | 349 ++++++++----
.../distributed/near/GridNearGetFuture.java | 453 +++------------
.../dht/CacheGetReadFromBackupFailoverTest.java | 257 +++++++++
.../testsuites/IgniteCacheTestSuite2.java | 2 +
9 files changed, 1218 insertions(+), 902 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1a8cf88..50db168 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -2244,24 +2245,32 @@ public class GridCacheContext<K, V> implements Externalizable {
*/
@Nullable public ClusterNode selectAffinityNodeBalanced(
List<ClusterNode> affNodes,
+ Set<ClusterNode> invalidNodes,
int partitionId,
boolean canRemap
) {
if (!readLoadBalancingEnabled) {
if (!canRemap) {
+ // Find next available node if we can not wait next topology version.
for (ClusterNode node : affNodes) {
- if (ctx.discovery().alive(node))
+ if (ctx.discovery().alive(node) && !invalidNodes.contains(node))
return node;
}
return null;
}
- else
- return affNodes.get(0);
+ else {
+ ClusterNode first = affNodes.get(0);
+
+ return !invalidNodes.contains(first) ? first : null;
+ }
}
- if (!readFromBackup)
- return affNodes.get(0);
+ if (!readFromBackup){
+ ClusterNode first = affNodes.get(0);
+
+ return !invalidNodes.contains(first) ? first : null;
+ }
assert locMacs != null;
@@ -2270,7 +2279,7 @@ public class GridCacheContext<K, V> implements Externalizable {
ClusterNode n0 = null;
for (ClusterNode node : affNodes) {
- if ((canRemap || discovery().alive(node) && isOwner(node, partitionId))) {
+ if ((canRemap || discovery().alive(node)) && !invalidNodes.contains(node)) {
if (locMacs.equals(node.attribute(ATTR_MACS)))
return node;
@@ -2285,16 +2294,6 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
- * Check that node is owner for partition.
- * @param node Cluster node.
- * @param partitionId Partition ID.
- * @return {@code}
- */
- private boolean isOwner(ClusterNode node, int partitionId) {
- return topology().partitionState(node.id(), partitionId) == OWNING;
- }
-
- /**
* Prepare affinity field for builder (if possible).
*
* @param buider Builder.
http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index b1a9be0..feea264 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -18,18 +18,40 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -40,8 +62,14 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
/**
*
*/
-public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCompoundIdentityFuture<Map<K, V>>
- implements GridCacheFuture<Map<K, V>>, CacheGetFuture {
+public abstract class CacheDistributedGetFutureAdapter<K, V>
+ extends GridCacheCompoundIdentityFuture<Map<K, V>> implements CacheGetFuture {
+ /** Logger reference. */
+ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
/** Default max remap count value. */
public static final int DFLT_MAX_REMAP_CNT = 3;
@@ -100,6 +128,10 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCo
/** */
protected final boolean recovery;
+ /** */
+ protected Map<AffinityTopologyVersion, Map<Integer, Set<ClusterNode>>> invalidNodes = Collections.emptyMap();
+
+
/**
* @param cctx Context.
* @param keys Keys.
@@ -149,6 +181,29 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCo
}
/**
+ * @param aclass Class.
+ */
+ protected void initLogger(Class<?> aclass){
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, aclass);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return trackable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ // Should not flip trackable flag from true to false since get future can be remapped.
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
* @param part Partition.
* @return {@code True} if partition is in owned state.
*/
@@ -157,12 +212,384 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCo
}
/**
+ * @param fut Future.
+ */
+ protected void registrateFutureInMvccManager(GridCacheFuture<?> fut) {
+ if (!trackable) {
+ trackable = true;
+
+ cctx.mvcc().addFuture(fut, futId);
+ }
+ }
+
+ /**
+ * @param node Cluster node.
+ * @param part Invalid partition.
+ * @param topVer Topology version.
+ */
+ protected synchronized void addNodeAsInvalid(ClusterNode node, int part, AffinityTopologyVersion topVer) {
+ if (invalidNodes == Collections.<AffinityTopologyVersion, Map<Integer, Set<ClusterNode>>>emptyMap()) {
+ invalidNodes = new HashMap<>();
+ }
+
+ Map<Integer, Set<ClusterNode>> invalidNodeMap = invalidNodes.get(topVer);
+
+ if (invalidNodeMap == null)
+ invalidNodes.put(topVer, invalidNodeMap = new HashMap<>());
+
+ Set<ClusterNode> invalidNodeSet = invalidNodeMap.get(part);
+
+ if (invalidNodeSet == null)
+ invalidNodeMap.put(part, invalidNodeSet = new HashSet<>());
+
+ invalidNodeSet.add(node);
+ }
+
+ /**
+ * @param part Partition.
+ * @param topVer Topology version.
+ * @return Set of invalid cluster nodes.
+ */
+ protected synchronized Set<ClusterNode> getInvalidNodes(int part, AffinityTopologyVersion topVer) {
+ Set<ClusterNode> invalidNodeSet = Collections.emptySet();
+
+ Map<Integer, Set<ClusterNode>> invalidNodesMap = invalidNodes.get(topVer);
+
+ if (invalidNodesMap != null) {
+ Set<ClusterNode> nodes = invalidNodesMap.get(part);
+
+ if (nodes != null)
+ invalidNodeSet = nodes;
+ }
+
+ return invalidNodeSet;
+ }
+
+ /**
+ *
+ * @param key Key.
+ * @param node Mapped node.
+ * @param missedNodesToKeysMapping Full node mapping.
+ */
+ protected boolean checkRetryPermits(
+ KeyCacheObject key,
+ ClusterNode node,
+ Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> missedNodesToKeysMapping
+ ) {
+ LinkedHashMap<KeyCacheObject, Boolean> keys = missedNodesToKeysMapping.get(node);
+
+ if (keys != null && keys.containsKey(key)) {
+ if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
+ onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+ MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+ U.toShortString(node) + ", mappings=" + missedNodesToKeysMapping + ']'));
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ boolean found = false;
+
+ for (IgniteInternalFuture<Map<K, V>> fut : futures())
+ if (isMini(fut)) {
+ AbstractMiniFuture f = (AbstractMiniFuture)fut;
+
+ if (f.node().id().equals(nodeId)) {
+ found = true;
+
+ f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid (will retry): " + nodeId));
+ }
+ }
+
+ return found;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onResult(UUID nodeId, GridNearGetResponse res) {
+ for (IgniteInternalFuture<Map<K, V>> fut : futures())
+ if (isMini(fut)) {
+ AbstractMiniFuture f = (AbstractMiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
+
+ f.onResult(res);
+ }
+ }
+ }
+
+ /**
* @param part Partition.
* @param topVer Topology version.
* @return Exception.
*/
protected final ClusterTopologyServerNotFoundException serverNotFoundError(int part, AffinityTopologyVersion topVer) {
return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid) [topVer=" + topVer + ", part" + part + ", cache=" + cctx.name() + ']');
+ "(all partition nodes left the grid) [topVer=" + topVer +
+ ", part" + part + ", cache=" + cctx.name() + ", localNodeId=" + cctx.localNodeId() + ']');
+ }
+
+ /**
+ * @param f Future.
+ * @return {@code True} if mini-future.
+ */
+ protected abstract boolean isMini(IgniteInternalFuture<?> f);
+
+ /**
+ * @param keys Collection of mapping keys.
+ * @param mapped Previous mapping.
+ * @param topVer Topology version.
+ */
+ protected abstract void map(
+ Collection<KeyCacheObject> keys,
+ Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
+ AffinityTopologyVersion topVer
+ );
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ Collection<String> futuresStrings = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+ @SuppressWarnings("unchecked")
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ if (isMini(f)) {
+ AbstractMiniFuture mini = (AbstractMiniFuture)f;
+
+ return "miniFuture([futId=" + mini.futureId() + ", node=" + mini.node().id() +
+ ", loc=" + mini.node().isLocal() +
+ ", done=" + f.isDone() + "])";
+ }
+ else
+ return f.getClass().getSimpleName() + " [loc=true, done=" + f.isDone() + "]";
+ }
+ });
+
+ return S.toString(CacheDistributedGetFutureAdapter.class, this,
+ "innerFuts", futuresStrings,
+ "super", super.toString());
+ }
+
+ /**
+ * Mini-future for get operations. Mini-futures are only waiting on a single
+ * node as opposed to multiple nodes.
+ */
+ protected abstract class AbstractMiniFuture extends GridFutureAdapter<Map<K, V>> {
+ /** Mini-future id. */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Mapped node. */
+ protected final ClusterNode node;
+
+ /** Mapped keys. */
+ @GridToStringInclude
+ protected final LinkedHashMap<KeyCacheObject, Boolean> keys;
+
+ /** Topology version on which this future was mapped. */
+ protected final AffinityTopologyVersion topVer;
+
+ /** Post processing closure. */
+ private final IgniteInClosure<Collection<GridCacheEntryInfo>> postProcessingClos;
+
+ /** {@code True} if remapped after node left. */
+ private boolean remapped;
+
+ /**
+ * @param node Node.
+ * @param keys Keys.
+ * @param topVer Topology version.
+ */
+ protected AbstractMiniFuture(
+ ClusterNode node,
+ LinkedHashMap<KeyCacheObject, Boolean> keys,
+ AffinityTopologyVersion topVer
+ ) {
+ this.node = node;
+ this.keys = keys;
+ this.topVer = topVer;
+ this.postProcessingClos = CU.createBackupPostProcessingClosure(
+ topVer, log, cctx, null, expiryPlc, readThrough, skipVals);
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public ClusterNode node() {
+ return node;
+ }
+
+ /**
+ * @return Keys.
+ */
+ public Collection<KeyCacheObject> keys() {
+ return keys.keySet();
+ }
+
+ /**
+ * Factory methond for generate request associated with this miniFuture.
+ *
+ * @param rootFutId Root future id.
+ * @return Near get request.
+ */
+ public GridNearGetRequest createGetRequest(IgniteUuid rootFutId) {
+ return createGetRequest0(rootFutId, futureId());
+ }
+
+ /**
+ * @param rootFutId Root future id.
+ * @param futId Mini future id.
+ * @return Near get request.
+ */
+ protected abstract GridNearGetRequest createGetRequest0(IgniteUuid rootFutId, IgniteUuid futId);
+
+ /**
+ * @param entries Collection of entries.
+ * @return Map with key value results.
+ */
+ protected abstract Map<K, V> createResultMap(Collection<GridCacheEntryInfo> entries);
+
+ /**
+ * @param e Error.
+ */
+ public void onResult(Throwable e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+ // Fail.
+ onDone(e);
+ }
+
+ /**
+ * @param e Failure exception.
+ */
+ public synchronized void onNodeLeft(ClusterTopologyCheckedException e) {
+ if (remapped)
+ return;
+
+ remapped = true;
+
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
+
+ // Try getting from existing nodes.
+ if (!canRemap) {
+ map(keys.keySet(), F.t(node, keys), topVer);
+
+ onDone(Collections.<K, V>emptyMap());
+ }
+ else {
+ long maxTopVer = Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion());
+
+ AffinityTopologyVersion awaitTopVer = new AffinityTopologyVersion(maxTopVer);
+
+ cctx.shared().exchange()
+ .affinityReadyFuture(awaitTopVer)
+ .listen((f) -> {
+ try {
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), f.get());
+
+ onDone(Collections.<K, V>emptyMap());
+ }
+ catch (IgniteCheckedException ex) {
+ CacheDistributedGetFutureAdapter.this.onDone(ex);
+ }
+ }
+ );
+ }
+ }
+
+ /**
+ * @param res Result callback.
+ */
+ public void onResult(GridNearGetResponse res) {
+ // If error happened on remote node, fail the whole future.
+ if (res.error() != null) {
+ onDone(res.error());
+
+ return;
+ }
+
+ Collection<Integer> invalidParts = res.invalidPartitions();
+
+ // Remap invalid partitions.
+ if (!F.isEmpty(invalidParts)) {
+ AffinityTopologyVersion rmtTopVer = res.topologyVersion();
+
+ for (Integer part : invalidParts)
+ addNodeAsInvalid(node, part, topVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
+
+ if (!canRemap) {
+ map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
+ @Override public boolean apply(KeyCacheObject key) {
+ return invalidParts.contains(cctx.affinity().partition(key));
+ }
+ }), F.t(node, keys), topVer);
+
+ postProcessResult(res);
+
+ onDone(createResultMap(res.entries()));
+
+ return;
+ }
+
+ // Remap after remote version will be finished localy.
+ cctx.shared().exchange().affinityReadyFuture(rmtTopVer)
+ .listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void applyx(
+ IgniteInternalFuture<AffinityTopologyVersion> fut
+ ) throws IgniteCheckedException {
+ AffinityTopologyVersion topVer = fut.get();
+
+ // This will append new futures to compound list.
+ map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
+ @Override public boolean apply(KeyCacheObject key) {
+ return invalidParts.contains(cctx.affinity().partition(key));
+ }
+ }), F.t(node, keys), topVer);
+
+ postProcessResult(res);
+
+ onDone(createResultMap(res.entries()));
+ }
+ });
+ }
+ else {
+ try {
+ postProcessResult(res);
+
+ onDone(createResultMap(res.entries()));
+ }
+ catch (Exception e) {
+ onDone(e);
+ }
+ }
+ }
+
+ /**
+ * @param res Response.
+ */
+ protected void postProcessResult(final GridNearGetResponse res) {
+ if (postProcessingClos != null)
+ postProcessingClos.apply(res.entries());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AbstractMiniFuture.class, this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 17bf4ca..611c87c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -969,12 +969,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
res0 = info.value();
}
- res = new GridNearSingleGetResponse(ctx.cacheId(),
+ res = new GridNearSingleGetResponse(
+ ctx.cacheId(),
req.futureId(),
null,
res0,
false,
- req.addDeploymentInfo());
+ req.addDeploymentInfo()
+ );
if (info != null && req.skipValues())
res.setContainsValue();
@@ -982,15 +984,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
else {
AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().initialVersion();
- assert topVer.compareTo(req.topologyVersion()) > 0 : "Wrong ready topology version for " +
- "invalid partitions response [topVer=" + topVer + ", req=" + req + ']';
-
- res = new GridNearSingleGetResponse(ctx.cacheId(),
+ res = new GridNearSingleGetResponse(
+ ctx.cacheId(),
req.futureId(),
topVer,
null,
true,
- req.addDeploymentInfo());
+ req.addDeploymentInfo()
+ );
}
}
catch (NodeStoppingException ignored) {
@@ -1075,8 +1076,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
res.error(e);
}
- if (!F.isEmpty(fut.invalidPartitions()))
- res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().initialVersion());
+ if (!F.isEmpty(fut.invalidPartitions())){
+ AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().initialVersion();
+
+ res.invalidPartitions(fut.invalidPartitions(), topVer);
+ }
try {
ctx.io().send(nodeId, res, ctx.ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index e0fe8be..8588084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -213,7 +213,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
*
*/
private void map() {
- // TODO get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251
+ // TODO Get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251.
if (cctx.group().preloader().needForceKeys()) {
GridDhtFuture<Object> fut = cctx.group().preloader().request(
cctx,
http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 2fcd677..0629754 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -23,10 +23,9 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -43,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
@@ -52,17 +50,10 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -71,17 +62,9 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V>
implements MvccSnapshotResponseListener {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- private static IgniteLogger log;
/** Transaction label. */
- private String txLbl;
+ protected final String txLbl;
/** */
protected final MvccSnapshot mvccSnapshot;
@@ -122,7 +105,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
@Nullable String txLbl,
@Nullable MvccSnapshot mvccSnapshot
) {
- super(cctx,
+ super(
+ cctx,
keys,
readThrough,
forcePrimary,
@@ -133,15 +117,16 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
skipVals,
needVer,
keepCacheObjects,
- recovery);
+ recovery
+ );
+
assert mvccSnapshot == null || cctx.mvccEnabled();
this.mvccSnapshot = mvccSnapshot;
this.txLbl = txLbl;
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
+ initLogger(GridPartitionedGetFuture.class);
}
/**
@@ -169,14 +154,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
public void init(AffinityTopologyVersion topVer) {
AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
+ // Can not remap if we in transaction and locked on some topology.
if (lockedTopVer != null) {
topVer = lockedTopVer;
canRemap = false;
}
- else {
- topVer = topVer.topologyVersion() > 0 ? topVer :
- canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+ else{
+ // Use affinity topology version if constructor version is not specify.
+ topVer = topVer.topologyVersion() > 0 ? topVer : cctx.affinity().affinityTopologyVersion();
}
if (!cctx.mvccEnabled() || mvccSnapshot != null)
@@ -184,84 +170,33 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
else {
mvccTracker = new MvccQueryTrackerImpl(cctx, canRemap);
- trackable = true;
-
- cctx.mvcc().addFuture(this, futId);
+ registrateFutureInMvccManager(this);
mvccTracker.requestSnapshot(topVer, this);
}
}
- @Override public void onResponse(MvccSnapshot res) {
- AffinityTopologyVersion topVer = mvccTracker.topologyVersion();
-
- assert topVer != null;
-
- initialMap(topVer);
- }
-
- @Override public void onError(IgniteCheckedException e) {
- onDone(e);
- }
-
/**
* @param topVer Topology version.
*/
private void initialMap(AffinityTopologyVersion topVer) {
- map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+ map(keys, Collections.emptyMap(), topVer);
markInitialized();
}
/** {@inheritDoc} */
- @Override public boolean trackable() {
- return trackable;
- }
+ @Override public void onResponse(MvccSnapshot res) {
+ AffinityTopologyVersion topVer = mvccTracker.topologyVersion();
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // Should not flip trackable flag from true to false since get future can be remapped.
- }
+ assert topVer != null;
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
+ initialMap(topVer);
}
/** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- boolean found = false;
-
- for (IgniteInternalFuture<Map<K, V>> fut : futures())
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.node().id().equals(nodeId)) {
- found = true;
-
- f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid (will retry): " + nodeId));
- }
- }
-
- return found;
- }
-
- /**
- * @param nodeId Sender.
- * @param res Result.
- */
- @Override public void onResult(UUID nodeId, GridNearGetResponse res) {
- for (IgniteInternalFuture<Map<K, V>> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
-
- f.onResult(res);
- }
- }
- }
+ @Override public void onError(IgniteCheckedException e) {
+ onDone(e);
}
/** {@inheritDoc} */
@@ -270,6 +205,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
if (trackable)
cctx.mvcc().removeFuture(futId);
+ MvccQueryTracker mvccTracker = this.mvccTracker;
+
if (mvccTracker != null)
mvccTracker.onDone();
@@ -281,11 +218,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
return false;
}
- /**
- * @param f Future.
- * @return {@code True} if mini-future.
- */
- private boolean isMini(IgniteInternalFuture<?> f) {
+ /** {@inheritDoc} */
+ @Override protected boolean isMini(IgniteInternalFuture<?> f) {
return f.getClass().equals(MiniFuture.class);
}
@@ -294,68 +228,60 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
* @param mapped Mappings to check for duplicates.
* @param topVer Topology version on which keys should be mapped.
*/
- private void map(
+ @Override protected void map(
Collection<KeyCacheObject> keys,
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
- final AffinityTopologyVersion topVer
+ AffinityTopologyVersion topVer
) {
Collection<ClusterNode> cacheNodes = CU.affinityNodes(cctx, topVer);
- if (cacheNodes.isEmpty()) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
-
- return;
- }
-
- GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
-
- Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, null, keys) : null;
-
- if (err != null) {
- onDone(err);
+ validate(cacheNodes, topVer);
+ // Future can be already done with some exception.
+ if (isDone())
return;
- }
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(cacheNodes.size());
- final int keysSize = keys.size();
+ int keysSize = keys.size();
+ // Map for local (key,value) pairs.
Map<K, V> locVals = U.newHashMap(keysSize);
+ // True if we have remote nodes after key mapping complete.
boolean hasRmtNodes = false;
- // Assign keys to primary nodes.
+ // Assign keys to nodes.
for (KeyCacheObject key : keys)
- hasRmtNodes |= map(key, mappings, locVals, topVer, mapped);
+ hasRmtNodes |= map(key, topVer, mappings, mapped, locVals);
+ // Future can be alredy done with some exception.
if (isDone())
return;
+ // Add local read (key,value) in result.
if (!locVals.isEmpty())
add(new GridFinishedFuture<>(locVals));
- if (hasRmtNodes) {
- if (!trackable) {
- trackable = true;
-
- cctx.mvcc().addFuture(this, futId);
- }
- }
+ // If we have remote nodes in mapping we should registrate future in mvcc manager.
+ if (hasRmtNodes)
+ registrateFutureInMvccManager(this);
- // Create mini futures.
+ // Create mini futures after mapping to remote nodes.
for (Map.Entry<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> entry : mappings.entrySet()) {
- final ClusterNode n = entry.getKey();
+ // Node for request.
+ ClusterNode n = entry.getKey();
- final LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = entry.getValue();
+ // Keys for request.
+ LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = entry.getValue();
assert !mappedKeys.isEmpty();
// If this is the primary or backup node for the keys.
if (n.isLocal()) {
- final GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
- cache().getDhtAsync(n.id(),
+ GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cache()
+ .getDhtAsync(
+ n.id(),
-1,
mappedKeys,
false,
@@ -367,68 +293,50 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
skipVals,
recovery,
txLbl,
- mvccSnapshot());
+ mvccSnapshot()
+ );
- final Collection<Integer> invalidParts = fut.invalidPartitions();
+ Collection<Integer> invalidParts = fut.invalidPartitions();
if (!F.isEmpty(invalidParts)) {
Collection<KeyCacheObject> remapKeys = new ArrayList<>(keysSize);
for (KeyCacheObject key : keys) {
- if (key != null && invalidParts.contains(cctx.affinity().partition(key)))
+ int part = cctx.affinity().partition(key);
+
+ if (key != null && invalidParts.contains(part)) {
+ addNodeAsInvalid(n, part, topVer);
+
remapKeys.add(key);
+ }
}
AffinityTopologyVersion updTopVer = cctx.shared().exchange().readyAffinityVersion();
- assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
- "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
- ", invalidParts=" + invalidParts + ']';
-
// Remap recursively.
map(remapKeys, mappings, updTopVer);
}
// Add new future.
- add(fut.chain(new C1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>, Map<K, V>>() {
- @Override public Map<K, V> apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) {
- try {
- return createResultMap(fut.get());
- }
- catch (Exception e) {
- U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
+ add(fut.chain(f -> {
+ try {
+ return createResultMap(f.get());
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
- onDone(e);
+ onDone(e);
- return Collections.emptyMap();
- }
+ return Collections.emptyMap();
}
}));
}
else {
- MiniFuture fut = new MiniFuture(n, mappedKeys, topVer,
- CU.createBackupPostProcessingClosure(topVer, log, cctx, null, expiryPlc, readThrough, skipVals));
-
- GridCacheMessage req = new GridNearGetRequest(
- cctx.cacheId(),
- futId,
- fut.futureId(),
- null,
- mappedKeys,
- readThrough,
- topVer,
- subjId,
- taskName == null ? 0 : taskName.hashCode(),
- expiryPlc != null ? expiryPlc.forCreate() : -1L,
- expiryPlc != null ? expiryPlc.forAccess() : -1L,
- false,
- skipVals,
- cctx.deploymentEnabled(),
- recovery,
- txLbl,
- mvccSnapshot());
+ MiniFuture miniFut = new MiniFuture(n, mappedKeys, topVer);
- add(fut); // Append new future.
+ GridCacheMessage req = miniFut.createGetRequest(futId);
+
+ add(miniFut); // Append new future.
try {
cctx.io().send(n, req, cctx.ioPolicy());
@@ -436,83 +344,118 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
catch (IgniteCheckedException e) {
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
- fut.onNodeLeft((ClusterTopologyCheckedException)e);
+ miniFut.onNodeLeft((ClusterTopologyCheckedException)e);
else
- fut.onResult(e);
+ miniFut.onResult(e);
}
}
}
}
/**
- * @param mappings Mappings.
+ * @param nodesToKeysMapping Mappings.
* @param key Key to map.
* @param locVals Local values.
* @param topVer Topology version.
- * @param mapped Previously mapped.
+ * @param missedNodesToKeysMapping Previously mapped.
* @return {@code True} if has remote nodes.
*/
private boolean map(
KeyCacheObject key,
- Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
- Map<K, V> locVals,
AffinityTopologyVersion topVer,
- Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped
+ Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> nodesToKeysMapping,
+ Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> missedNodesToKeysMapping,
+ Map<K, V> locVals
) {
int part = cctx.affinity().partition(key);
List<ClusterNode> affNodes = cctx.affinity().nodesByPartition(part, topVer);
+ // Failed if none affinity node found.
if (affNodes.isEmpty()) {
onDone(serverNotFoundError(part, topVer));
return false;
}
- // Local get cannot be used with MVCC as local node can contain some visible version which is not latest.
- boolean fastLocGet = !cctx.mvccEnabled() && (!forcePrimary || affNodes.get(0).isLocal()) &&
- cctx.reserveForFastLocalGet(part, topVer);
+ // Try to read key localy if we can.
+ if (tryLocalGet(key, part, topVer, affNodes, locVals))
+ return false;
- if (fastLocGet) {
- try {
- if (localGet(topVer, key, part, locVals))
- return false;
- }
- finally {
- cctx.releaseForFastLocalGet(part, topVer);
- }
- }
+ Set<ClusterNode> invalidNodeSet = getInvalidNodes(part, topVer);
- ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap);
+ // Get remote node for request for this key.
+ ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, invalidNodeSet, part, canRemap);
+ // Failed if none remote node found.
if (node == null) {
onDone(serverNotFoundError(part, topVer));
return false;
}
+ // The node still can be local, see details implementation of #tryLocalGet().
boolean remote = !node.isLocal();
- LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+ // Check retry counter, bound for avoid inifinit remap.
+ if (!checkRetryPermits(key, node, missedNodesToKeysMapping))
+ return false;
- if (keys != null && keys.containsKey(key)) {
- if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
- onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
- MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
- U.toShortString(node) + ", mappings=" + mapped + ']'));
+ addNodeMapping(key, node, nodesToKeysMapping);
- return false;
- }
- }
+ return remote;
+ }
+ /**
+ *
+ * @param key Key.
+ * @param node Mapped node.
+ * @param mappings Full node mapping.
+ */
+ private void addNodeMapping(
+ KeyCacheObject key,
+ ClusterNode node,
+ Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings
+ ) {
LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
if (old == null)
mappings.put(node, old = new LinkedHashMap<>(3, 1f));
old.put(key, false);
+ }
- return remote;
+ /**
+ *
+ * @param key Key.
+ * @param part Partition.
+ * @param topVer Topology version.
+ * @param affNodes Affynity nodes.
+ * @param locVals Map for local (key,value) pairs.
+ */
+ private boolean tryLocalGet(
+ KeyCacheObject key,
+ int part,
+ AffinityTopologyVersion topVer,
+ List<ClusterNode> affNodes,
+ Map<K, V> locVals
+ ) {
+ // Local get cannot be used with MVCC as local node can contain some visible version which is not latest.
+ boolean fastLocGet = !cctx.mvccEnabled() &&
+ (!forcePrimary || affNodes.get(0).isLocal()) &&
+ cctx.reserveForFastLocalGet(part, topVer);
+
+ if (fastLocGet) {
+ try {
+ if (localGet(topVer, key, part, locVals))
+ return true;
+ }
+ finally {
+ cctx.releaseForFastLocalGet(part, topVer);
+ }
+ }
+
+ return false;
}
/**
@@ -662,6 +605,27 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
}
/**
+ *
+ * @param cacheNodes Cache affynity nodes.
+ * @param topVer Topology version.
+ */
+ private void validate(Collection<ClusterNode> cacheNodes, AffinityTopologyVersion topVer) {
+ if (cacheNodes.isEmpty()) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+
+ return;
+ }
+
+ GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
+
+ Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, null, keys) : null;
+
+ if (err != null)
+ onDone(err);
+ }
+
+ /**
* @return Near cache.
*/
private GridDhtCacheAdapter<K, V> cache() {
@@ -701,21 +665,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
/** {@inheritDoc} */
@Override public String toString() {
- Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
- @SuppressWarnings("unchecked")
- @Override public String apply(IgniteInternalFuture<?> f) {
- if (isMini(f)) {
- return "[node=" + ((MiniFuture)f).node().id() +
- ", loc=" + ((MiniFuture)f).node().isLocal() +
- ", done=" + f.isDone() + "]";
- }
- else
- return "[loc=true, done=" + f.isDone() + "]";
- }
- });
-
return S.toString(GridPartitionedGetFuture.class, this,
- "innerFuts", futs,
"super", super.toString());
}
@@ -723,197 +673,46 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
* Mini-future for get operations. Mini-futures are only waiting on a single
* node as opposed to multiple nodes.
*/
- private class MiniFuture extends GridFutureAdapter<Map<K, V>> {
- /** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Node ID. */
- private final ClusterNode node;
-
- /** Keys. */
- @GridToStringInclude
- private final LinkedHashMap<KeyCacheObject, Boolean> keys;
-
- /** Topology version on which this future was mapped. */
- private final AffinityTopologyVersion topVer;
-
- /** Post processing closure. */
- private final IgniteInClosure<Collection<GridCacheEntryInfo>> postProcessingClos;
-
- /** {@code True} if remapped after node left. */
- private boolean remapped;
-
+ private class MiniFuture extends AbstractMiniFuture {
/**
* @param node Node.
* @param keys Keys.
* @param topVer Topology version.
- * @param postProcessingClos Post processing closure.
*/
- MiniFuture(ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, AffinityTopologyVersion topVer,
- @Nullable IgniteInClosure<Collection<GridCacheEntryInfo>> postProcessingClos) {
- this.node = node;
- this.keys = keys;
- this.topVer = topVer;
- this.postProcessingClos = postProcessingClos;
+ public MiniFuture(
+ ClusterNode node,
+ LinkedHashMap<KeyCacheObject, Boolean> keys,
+ AffinityTopologyVersion topVer
+ ) {
+ super(node, keys, topVer);
}
- /**
- * @return Future ID.
- */
- IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Node ID.
- */
- public ClusterNode node() {
- return node;
- }
-
- /**
- * @return Keys.
- */
- public Collection<KeyCacheObject> keys() {
- return keys.keySet();
- }
-
- /**
- * @param e Error.
- */
- void onResult(Throwable e) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- // Fail.
- onDone(e);
- }
-
- /**
- * @param e Failure exception.
- */
- synchronized void onNodeLeft(ClusterTopologyCheckedException e) {
- if (remapped)
- return;
-
- remapped = true;
-
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
-
- // Try getting from existing nodes.
- if (!canRemap) {
- map(keys.keySet(), F.t(node, keys), topVer);
-
- onDone(Collections.<K, V>emptyMap());
- }
- else {
- AffinityTopologyVersion updTopVer =
- new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
-
- cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(
- new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- try {
- // Remap.
- map(keys.keySet(), F.t(node, keys), fut.get());
-
- onDone(Collections.<K, V>emptyMap());
- }
- catch (IgniteCheckedException e) {
- GridPartitionedGetFuture.this.onDone(e);
- }
- }
- }
- );
- }
- }
-
- /**
- * @param res Result callback.
- */
- void onResult(final GridNearGetResponse res) {
- final Collection<Integer> invalidParts = res.invalidPartitions();
-
- // If error happened on remote node, fail the whole future.
- if (res.error() != null) {
- onDone(res.error());
-
- return;
- }
-
- // Remap invalid partitions.
- if (!F.isEmpty(invalidParts)) {
- AffinityTopologyVersion rmtTopVer = res.topologyVersion();
-
- assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO);
-
- if (rmtTopVer.compareTo(topVer) <= 0) {
- // Fail the whole get future.
- onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
- "invalid partitions but remote topology version does not differ from local) " +
- "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", invalidParts=" + invalidParts +
- ", nodeId=" + node.id() + ']'));
-
- return;
- }
-
- if (log.isDebugEnabled())
- log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
-
- if (!canRemap) {
- map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
- @Override public boolean apply(KeyCacheObject key) {
- return invalidParts.contains(cctx.affinity().partition(key));
- }
- }), F.t(node, keys), topVer);
-
- postProcessResult(res);
-
- onDone(createResultMap(res.entries()));
-
- return;
- }
-
- // Need to wait for next topology version to remap.
- IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.shared().exchange().affinityReadyFuture(rmtTopVer);
-
- topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void applyx(
- IgniteInternalFuture<AffinityTopologyVersion> fut) throws IgniteCheckedException {
- AffinityTopologyVersion topVer = fut.get();
-
- // This will append new futures to compound list.
- map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
- @Override public boolean apply(KeyCacheObject key) {
- return invalidParts.contains(cctx.affinity().partition(key));
- }
- }), F.t(node, keys), topVer);
-
- postProcessResult(res);
-
- onDone(createResultMap(res.entries()));
- }
- });
- }
- else {
- try {
- postProcessResult(res);
-
- onDone(createResultMap(res.entries()));
- }
- catch (Exception e) {
- onDone(e);
- }
- }
+ /** {@inheritDoc} */
+ @Override protected GridNearGetRequest createGetRequest0(IgniteUuid rootFutId, IgniteUuid futId) {
+ return new GridNearGetRequest(
+ cctx.cacheId(),
+ rootFutId,
+ futId,
+ null,
+ keys,
+ readThrough,
+ topVer,
+ subjId,
+ taskName == null ? 0 : taskName.hashCode(),
+ expiryPlc != null ? expiryPlc.forCreate() : -1L,
+ expiryPlc != null ? expiryPlc.forAccess() : -1L,
+ false,
+ skipVals,
+ cctx.deploymentEnabled(),
+ recovery,
+ txLbl,
+ mvccSnapshot()
+ );
}
- /**
- * @param res Response.
- */
- private void postProcessResult(final GridNearGetResponse res) {
- if (postProcessingClos != null)
- postProcessingClos.apply(res.entries());
+ /** {@inheritDoc} */
+ @Override protected Map<K, V> createResultMap(Collection<GridCacheEntryInfo> entries) {
+ return GridPartitionedGetFuture.this.createResultMap(entries);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 4d0e129..0d69485 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -18,8 +18,12 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -51,8 +55,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -61,13 +63,25 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
/**
*
*/
-public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object> implements GridCacheFuture<Object>,
- CacheGetFuture, IgniteDiagnosticAware {
+public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object>
+ implements CacheGetFuture, IgniteDiagnosticAware {
+ /** Default max remap count value. */
+ public static final int DFLT_MAX_REMAP_CNT = 3;
+
+ /** Maximum number of attempts to remap key to the same primary node. */
+ protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
+
+ /** Remap count updater. */
+ protected static final AtomicIntegerFieldUpdater<GridPartitionedSingleGetFuture> REMAP_CNT_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(GridPartitionedSingleGetFuture.class, "remapCnt");
+
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -133,7 +147,13 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
private volatile BackupPostProcessingClosure postProcessingClos;
/** Transaction label. */
- private String txLbl;
+ private final String txLbl;
+
+ /** Invalid mappened nodes. */
+ private Set<ClusterNode> invalidNodes = Collections.emptySet();
+
+ /** Remap count. */
+ protected volatile int remapCnt;
/**
* @param cctx Context.
@@ -204,30 +224,30 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
}
/**
- *
+ * Initialize future.
*/
public void init() {
- AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
- canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
-
- GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
-
- Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, key, null) : null;
+ AffinityTopologyVersion mappingtopVermappingtopVer;
- if (err != null) {
- onDone(err);
-
- return;
+ if (topVer.topologyVersion() > 0)
+ mappingtopVermappingtopVer = topVer;
+ else {
+ mappingtopVermappingtopVer = canRemap ?
+ cctx.affinity().affinityTopologyVersion() :
+ cctx.shared().exchange().readyAffinityVersion();
}
- map(topVer);
+ map(mappingtopVermappingtopVer);
}
/**
* @param topVer Topology version.
*/
@SuppressWarnings("unchecked")
- private void map(final AffinityTopologyVersion topVer) {
+ private void map(AffinityTopologyVersion topVer) {
+ if (!validate(topVer))
+ return;
+
ClusterNode node = mapKeyToNode(topVer);
if (node == null) {
@@ -239,46 +259,46 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
if (isDone())
return;
+ // Read value if node is localNode.
if (node.isLocal()) {
- final GridDhtFuture<GridCacheEntryInfo> fut = cctx.dht().getDhtSingleAsync(node.id(),
- -1,
- key,
- false,
- readThrough,
- topVer,
- subjId,
- taskName == null ? 0 : taskName.hashCode(),
- expiryPlc,
- skipVals,
- recovery,
- txLbl,
- mvccSnapshot);
-
- final Collection<Integer> invalidParts = fut.invalidPartitions();
+ GridDhtFuture<GridCacheEntryInfo> fut = cctx.dht()
+ .getDhtSingleAsync(
+ node.id(),
+ -1,
+ key,
+ false,
+ readThrough,
+ topVer,
+ subjId,
+ taskName == null ? 0 : taskName.hashCode(),
+ expiryPlc,
+ skipVals,
+ recovery,
+ txLbl,
+ mvccSnapshot
+ );
+
+ Collection<Integer> invalidParts = fut.invalidPartitions();
if (!F.isEmpty(invalidParts)) {
- AffinityTopologyVersion updTopVer = cctx.shared().exchange().readyAffinityVersion();
+ addNodeAsInvalid(node);
- assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology " +
- "version did not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
- ", invalidParts=" + invalidParts + ']';
+ AffinityTopologyVersion updTopVer = cctx.shared().exchange().readyAffinityVersion();
// Remap recursively.
map(updTopVer);
}
else {
- fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
- @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> fut) {
- try {
- GridCacheEntryInfo info = fut.get();
+ fut.listen(f -> {
+ try {
+ GridCacheEntryInfo info = f.get();
- setResult(info);
- }
- catch (Exception e) {
- U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
+ setResult(info);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
- onDone(e);
- }
+ onDone(e);
}
});
}
@@ -291,15 +311,11 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
this.node = node;
}
- if (!trackable) {
- trackable = true;
-
- cctx.mvcc().addFuture(this, futId);
- }
+ registrateFutureInMvccManager(this);
boolean needVer = this.needVer;
- final BackupPostProcessingClosure postClos = CU.createBackupPostProcessingClosure(topVer, log,
+ BackupPostProcessingClosure postClos = CU.createBackupPostProcessingClosure(topVer, log,
cctx, key, expiryPlc, readThrough, skipVals);
if (postClos != null) {
@@ -309,7 +325,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
postProcessingClos = postClos;
}
- GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),
+ GridCacheMessage req = new GridNearSingleGetRequest(
+ cctx.cacheId(),
futId.localId(),
key,
readThrough,
@@ -324,7 +341,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
cctx.deploymentEnabled(),
recovery,
txLbl,
- mvccSnapshot);
+ mvccSnapshot
+ );
try {
cctx.io().send(node, req, cctx.ioPolicy());
@@ -347,35 +365,58 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
List<ClusterNode> affNodes = cctx.affinity().nodesByPartition(part, topVer);
+ // Failed if none affinity node found by assigment.
if (affNodes.isEmpty()) {
onDone(serverNotFoundError(part, topVer));
return null;
}
+ // Try to read key localy if we can.
+ if (tryLocalGet(key, part, topVer, affNodes))
+ return null;
+
+ ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, getInvalidNodes(), part, canRemap);
+
+ // Failed if none balanced node found.
+ if (affNode == null) {
+ onDone(serverNotFoundError(part, topVer));
+
+ return null;
+ }
+
+ return affNode;
+ }
+
+ /**
+ *
+ * @param key Key.
+ * @param part Partition.
+ * @param topVer Topology version.
+ * @param affNodes Affynity nodes.
+ */
+ private boolean tryLocalGet(
+ KeyCacheObject key,
+ int part,
+ AffinityTopologyVersion topVer,
+ List<ClusterNode> affNodes
+ ) {
// Local get cannot be used with MVCC as local node can contain some visible version which is not latest.
- boolean fastLocGet = !cctx.mvccEnabled() && (!forcePrimary || affNodes.get(0).isLocal()) &&
+ boolean fastLocGet = !cctx.mvccEnabled() &&
+ (!forcePrimary || affNodes.get(0).isLocal()) &&
cctx.reserveForFastLocalGet(part, topVer);
if (fastLocGet) {
try {
- if (localGet(topVer, part))
- return null;
+ if (localGet(topVer, key, part))
+ return true;
}
finally {
cctx.releaseForFastLocalGet(part, topVer);
}
}
- ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap);
-
- if (affNode == null) {
- onDone(serverNotFoundError(part, topVer));
-
- return null;
- }
-
- return affNode;
+ return false;
}
/**
@@ -383,7 +424,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
* @param part Partition.
* @return {@code True} if future completed.
*/
- private boolean localGet(AffinityTopologyVersion topVer, int part) {
+ private boolean localGet(AffinityTopologyVersion topVer, KeyCacheObject key, int part) {
assert cctx.affinityNode() : this;
GridDhtCacheAdapter colocated = cctx.dht();
@@ -522,12 +563,28 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
}
/**
+ * @param fut Future.
+ */
+ private void registrateFutureInMvccManager(GridCacheFuture<?> fut) {
+ if (!trackable) {
+ trackable = true;
+
+ cctx.mvcc().addFuture(fut, futId);
+ }
+ }
+
+ /**
* @param nodeId Node ID.
* @param res Result.
*/
public void onResult(UUID nodeId, GridNearSingleGetResponse res) {
- if (!processResponse(nodeId) ||
- !checkError(res.error(), res.invalidPartitions(), res.topologyVersion(), nodeId))
+ // Brake here if response from unexpected node.
+ if (!processResponse(nodeId))
+ return;
+
+ // Brake here if exception was throws on remote node or
+ // parition on remote node is invalid.
+ if (!checkError(nodeId, res.invalidPartitions(), res.topologyVersion(), res.error()))
return;
Message res0 = res.result();
@@ -562,13 +619,15 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
}
}
- /**
- * @param nodeId Node ID.
- * @param res Result.
- */
+ /** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridNearGetResponse res) {
- if (!processResponse(nodeId) ||
- !checkError(res.error(), !F.isEmpty(res.invalidPartitions()), res.topologyVersion(), nodeId))
+ // Brake here if response from unexpected node.
+ if (!processResponse(nodeId))
+ return;
+
+ // Brake here if exception was throws on remote node or
+ // parition on remote node is invalid.
+ if (!checkError(nodeId, !F.isEmpty(res.invalidPartitions()), res.topologyVersion(), res.error()))
return;
Collection<GridCacheEntryInfo> infos = res.entries();
@@ -601,10 +660,12 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
* @param nodeId Node ID.
* @return {@code True} if should process received response.
*/
- private boolean checkError(@Nullable IgniteCheckedException err,
+ private boolean checkError(
+ UUID nodeId,
boolean invalidParts,
AffinityTopologyVersion rmtTopVer,
- UUID nodeId) {
+ @Nullable IgniteCheckedException err
+ ) {
if (err != null) {
onDone(err);
@@ -612,34 +673,10 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
}
if (invalidParts) {
- assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO);
-
- if (rmtTopVer.compareTo(topVer) <= 0) {
- // Fail the whole get future.
- onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
- "invalid partitions but remote topology version does not differ from local) " +
- "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", part=" + cctx.affinity().partition(key) +
- ", nodeId=" + nodeId + ']'));
-
- return false;
- }
+ addNodeAsInvalid(cctx.node(nodeId));
if (canRemap) {
- IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.shared().exchange().affinityReadyFuture(rmtTopVer);
-
- topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void applyx(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- try {
- AffinityTopologyVersion topVer = fut.get();
-
- remap(topVer);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- }
- });
-
+ awaitVersionAndRemap(rmtTopVer);
}
else
map(topVer);
@@ -728,6 +765,43 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
}
/**
+ * @param node Invalid node.
+ */
+ private synchronized void addNodeAsInvalid(ClusterNode node) {
+ if (invalidNodes == Collections.<ClusterNode>emptySet())
+ invalidNodes = new HashSet<>();
+
+ invalidNodes.add(node);
+ }
+
+ /**
+ * @return Set of invalid cluster nodes.
+ */
+ protected synchronized Set<ClusterNode> getInvalidNodes() {
+ return invalidNodes;
+ }
+
+ /**
+ * @param topVer Topology version.
+ */
+ private boolean checkRetryPermits(AffinityTopologyVersion topVer) {
+ if (topVer.equals(this.topVer))
+ return true;
+
+ if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
+ ClusterNode node0 = node;
+
+ onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+ MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+ (node0 != null ? U.toShortString(node0) : node0) + ", invalidNodes=" + invalidNodes + ']'));
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* @param part Partition.
* @param topVer Topology version.
* @return Exception.
@@ -737,6 +811,27 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
"(all partition nodes left the grid) [topVer=" + topVer + ", part=" + part + ", cache=" + cctx.name() + ']');
}
+ /**
+ * @param topVer Topology version.
+ * @return True if validate success, False is not.
+ */
+ private boolean validate(AffinityTopologyVersion topVer) {
+ if (!checkRetryPermits(topVer))
+ return false;
+
+ GridDhtTopologyFuture lastFut = cctx.shared().exchange().lastFinishedFuture();
+
+ Throwable error = lastFut.validateCache(cctx, recovery, true, key, null);
+
+ if (error != null) {
+ onDone(error);
+
+ return false;
+ }
+ else
+ return true;
+ }
+
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futId;
@@ -748,20 +843,9 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
return false;
if (canRemap) {
- AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(
- Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
-
- cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(
- new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- try {
- remap(fut.get());
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- }
- });
+ long maxTopVer = Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion());
+
+ awaitVersionAndRemap(new AffinityTopologyVersion(maxTopVer));
}
else
remap(topVer);
@@ -772,15 +856,30 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
/**
* @param topVer Topology version.
*/
+ private void awaitVersionAndRemap(AffinityTopologyVersion topVer){
+ IgniteInternalFuture<AffinityTopologyVersion> awaitTopologyVersionFuture =
+ cctx.shared().exchange().affinityReadyFuture(topVer);
+
+ awaitTopologyVersionFuture.listen(f -> {
+ try {
+ remap(f.get());
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ });
+ }
+
+ /**
+ * @param topVer Topology version.
+ */
private void remap(final AffinityTopologyVersion topVer) {
cctx.closures().runLocalSafe(new Runnable() {
@Override public void run() {
- GridDhtTopologyFuture lastFut = cctx.shared().exchange().lastFinishedFuture();
-
- Throwable error = lastFut.validateCache(cctx, recovery, true, key, null);
-
- if (error != null)
- onDone(error);
+ // If topology changed reset collection of invalid nodes.
+ synchronized (this) {
+ invalidNodes = Collections.emptySet();
+ }
map(topVer);
}
@@ -834,6 +933,6 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridPartitionedSingleGetFuture.class, this);
+ return S.toString(GridPartitionedSingleGetFuture.class, this, "super", super.toString());
}
}