You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/30 13:11:32 UTC

[2/2] ignite git commit: IGNITE-10352 Fix cache get request can be mapped to node while partition in MOVING state - Fixes #5519.

IGNITE-10352 Fix cache get request can be mapped to node while partition in MOVING state - Fixes #5519.

Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18aaee03
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18aaee03
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18aaee03

Branch: refs/heads/master
Commit: 18aaee039908821d9d962a7bf3659578164e375d
Parents: 74d342d
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Fri Nov 30 16:10:56 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Fri Nov 30 16:10:56 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |  31 +-
 .../dht/CacheDistributedGetFutureAdapter.java   | 433 +++++++++++++-
 .../distributed/dht/GridDhtCacheAdapter.java    |  22 +-
 .../distributed/dht/GridDhtGetSingleFuture.java |   2 +-
 .../dht/GridPartitionedGetFuture.java           | 571 ++++++-------------
 .../dht/GridPartitionedSingleGetFuture.java     | 349 ++++++++----
 .../distributed/near/GridNearGetFuture.java     | 453 +++------------
 .../dht/CacheGetReadFromBackupFailoverTest.java | 257 +++++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   2 +
 9 files changed, 1218 insertions(+), 902 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1a8cf88..50db168 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -30,6 +30,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -2244,24 +2245,32 @@ public class GridCacheContext<K, V> implements Externalizable {
      */
     @Nullable public ClusterNode selectAffinityNodeBalanced(
         List<ClusterNode> affNodes,
+        Set<ClusterNode> invalidNodes,
         int partitionId,
         boolean canRemap
     ) {
         if (!readLoadBalancingEnabled) {
             if (!canRemap) {
+                // Find next available node if we can not wait next topology version.
                 for (ClusterNode node : affNodes) {
-                    if (ctx.discovery().alive(node))
+                    if (ctx.discovery().alive(node) && !invalidNodes.contains(node))
                         return node;
                 }
 
                 return null;
             }
-            else
-                return affNodes.get(0);
+            else {
+                ClusterNode first = affNodes.get(0);
+
+                return !invalidNodes.contains(first) ? first : null;
+            }
         }
 
-        if (!readFromBackup)
-            return affNodes.get(0);
+        if (!readFromBackup){
+            ClusterNode first = affNodes.get(0);
+
+            return !invalidNodes.contains(first) ? first : null;
+        }
 
         assert locMacs != null;
 
@@ -2270,7 +2279,7 @@ public class GridCacheContext<K, V> implements Externalizable {
         ClusterNode n0 = null;
 
         for (ClusterNode node : affNodes) {
-            if ((canRemap || discovery().alive(node) && isOwner(node, partitionId))) {
+            if ((canRemap || discovery().alive(node)) && !invalidNodes.contains(node)) {
                 if (locMacs.equals(node.attribute(ATTR_MACS)))
                     return node;
 
@@ -2285,16 +2294,6 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
-     *  Check that node is owner for partition.
-     * @param node Cluster node.
-     * @param partitionId Partition ID.
-     * @return {@code}
-     */
-    private boolean isOwner(ClusterNode node, int partitionId) {
-        return topology().partitionState(node.id(), partitionId) == OWNING;
-    }
-
-    /**
      * Prepare affinity field for builder (if possible).
      *
      * @param buider Builder.

http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index b1a9be0..feea264 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -18,18 +18,40 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -40,8 +62,14 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
 /**
  *
  */
-public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCompoundIdentityFuture<Map<K, V>>
-    implements GridCacheFuture<Map<K, V>>, CacheGetFuture {
+public abstract class CacheDistributedGetFutureAdapter<K, V>
+    extends GridCacheCompoundIdentityFuture<Map<K, V>> implements CacheGetFuture {
+    /** Logger reference. */
+    protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
     /** Default max remap count value. */
     public static final int DFLT_MAX_REMAP_CNT = 3;
 
@@ -100,6 +128,10 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCo
     /** */
     protected final boolean recovery;
 
+    /** */
+    protected Map<AffinityTopologyVersion, Map<Integer, Set<ClusterNode>>> invalidNodes = Collections.emptyMap();
+
+
     /**
      * @param cctx Context.
      * @param keys Keys.
@@ -149,6 +181,29 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCo
     }
 
     /**
+     * @param aclass Class.
+     */
+    protected void initLogger(Class<?> aclass){
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, aclass);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return trackable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // Should not flip trackable flag from true to false since get future can be remapped.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
      * @param part Partition.
      * @return {@code True} if partition is in owned state.
      */
@@ -157,12 +212,384 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCo
     }
 
     /**
+     * @param fut Future.
+     */
+    protected void registrateFutureInMvccManager(GridCacheFuture<?> fut) {
+        if (!trackable) {
+            trackable = true;
+
+            cctx.mvcc().addFuture(fut, futId);
+        }
+    }
+
+    /**
+     * @param node Cluster node.
+     * @param part Invalid partition.
+     * @param topVer Topology version.
+     */
+    protected synchronized void addNodeAsInvalid(ClusterNode node, int part, AffinityTopologyVersion topVer) {
+        if (invalidNodes == Collections.<AffinityTopologyVersion, Map<Integer, Set<ClusterNode>>>emptyMap()) {
+            invalidNodes = new HashMap<>();
+        }
+
+        Map<Integer, Set<ClusterNode>> invalidNodeMap = invalidNodes.get(topVer);
+
+        if (invalidNodeMap == null)
+            invalidNodes.put(topVer, invalidNodeMap = new HashMap<>());
+
+        Set<ClusterNode> invalidNodeSet = invalidNodeMap.get(part);
+
+        if (invalidNodeSet == null)
+            invalidNodeMap.put(part, invalidNodeSet = new HashSet<>());
+
+        invalidNodeSet.add(node);
+    }
+
+    /**
+     * @param part Partition.
+     * @param topVer Topology version.
+     * @return Set of invalid cluster nodes.
+     */
+    protected synchronized Set<ClusterNode> getInvalidNodes(int part, AffinityTopologyVersion topVer) {
+        Set<ClusterNode> invalidNodeSet = Collections.emptySet();
+
+        Map<Integer, Set<ClusterNode>> invalidNodesMap = invalidNodes.get(topVer);
+
+        if (invalidNodesMap != null) {
+            Set<ClusterNode> nodes = invalidNodesMap.get(part);
+
+            if (nodes != null)
+                invalidNodeSet = nodes;
+        }
+
+        return invalidNodeSet;
+    }
+
+    /**
+     *
+     * @param key Key.
+     * @param node Mapped node.
+     * @param missedNodesToKeysMapping Full node mapping.
+     */
+    protected boolean checkRetryPermits(
+        KeyCacheObject key,
+        ClusterNode node,
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> missedNodesToKeysMapping
+    ) {
+        LinkedHashMap<KeyCacheObject, Boolean> keys = missedNodesToKeysMapping.get(node);
+
+        if (keys != null && keys.containsKey(key)) {
+            if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
+                onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+                    MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+                    U.toShortString(node) + ", mappings=" + missedNodesToKeysMapping + ']'));
+
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        boolean found = false;
+
+        for (IgniteInternalFuture<Map<K, V>> fut : futures())
+            if (isMini(fut)) {
+                AbstractMiniFuture f = (AbstractMiniFuture)fut;
+
+                if (f.node().id().equals(nodeId)) {
+                    found = true;
+
+                    f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid (will retry): " + nodeId));
+                }
+            }
+
+        return found;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onResult(UUID nodeId, GridNearGetResponse res) {
+        for (IgniteInternalFuture<Map<K, V>> fut : futures())
+            if (isMini(fut)) {
+                AbstractMiniFuture f = (AbstractMiniFuture)fut;
+
+                if (f.futureId().equals(res.miniId())) {
+                    assert f.node().id().equals(nodeId);
+
+                    f.onResult(res);
+                }
+            }
+    }
+
+    /**
      * @param part Partition.
      * @param topVer Topology version.
      * @return Exception.
      */
     protected final ClusterTopologyServerNotFoundException serverNotFoundError(int part, AffinityTopologyVersion topVer) {
         return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-            "(all partition nodes left the grid) [topVer=" + topVer + ", part" + part + ", cache=" + cctx.name() + ']');
+            "(all partition nodes left the grid) [topVer=" + topVer +
+            ", part" + part + ", cache=" + cctx.name() + ", localNodeId=" + cctx.localNodeId() + ']');
+    }
+
+    /**
+     * @param f Future.
+     * @return {@code True} if mini-future.
+     */
+    protected abstract boolean isMini(IgniteInternalFuture<?> f);
+
+    /**
+     * @param keys Collection of mapping keys.
+     * @param mapped Previous mapping.
+     * @param topVer Topology version.
+     */
+    protected abstract void map(
+        Collection<KeyCacheObject> keys,
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
+        AffinityTopologyVersion topVer
+    );
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        Collection<String> futuresStrings = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+            @SuppressWarnings("unchecked")
+            @Override public String apply(IgniteInternalFuture<?> f) {
+                if (isMini(f)) {
+                    AbstractMiniFuture mini = (AbstractMiniFuture)f;
+
+                    return "miniFuture([futId=" + mini.futureId() + ", node=" + mini.node().id() +
+                        ", loc=" + mini.node().isLocal() +
+                        ", done=" + f.isDone() + "])";
+                }
+                else
+                    return f.getClass().getSimpleName() + " [loc=true, done=" + f.isDone() + "]";
+            }
+        });
+
+        return S.toString(CacheDistributedGetFutureAdapter.class, this,
+            "innerFuts", futuresStrings,
+            "super", super.toString());
+    }
+
+    /**
+     * Mini-future for get operations. Mini-futures are only waiting on a single
+     * node as opposed to multiple nodes.
+     */
+    protected abstract class AbstractMiniFuture extends GridFutureAdapter<Map<K, V>> {
+        /** Mini-future id. */
+        private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+        /** Mapped node. */
+        protected final ClusterNode node;
+
+        /** Mapped keys. */
+        @GridToStringInclude
+        protected final LinkedHashMap<KeyCacheObject, Boolean> keys;
+
+        /** Topology version on which this future was mapped. */
+        protected final AffinityTopologyVersion topVer;
+
+        /** Post processing closure. */
+        private final IgniteInClosure<Collection<GridCacheEntryInfo>> postProcessingClos;
+
+        /** {@code True} if remapped after node left. */
+        private boolean remapped;
+
+        /**
+         * @param node Node.
+         * @param keys Keys.
+         * @param topVer Topology version.
+         */
+        protected AbstractMiniFuture(
+            ClusterNode node,
+            LinkedHashMap<KeyCacheObject, Boolean> keys,
+            AffinityTopologyVersion topVer
+        ) {
+            this.node = node;
+            this.keys = keys;
+            this.topVer = topVer;
+            this.postProcessingClos = CU.createBackupPostProcessingClosure(
+                topVer, log, cctx, null, expiryPlc, readThrough, skipVals);
+        }
+
+        /**
+         * @return Future ID.
+         */
+        public IgniteUuid futureId() {
+            return futId;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public ClusterNode node() {
+            return node;
+        }
+
+        /**
+         * @return Keys.
+         */
+        public Collection<KeyCacheObject> keys() {
+            return keys.keySet();
+        }
+
+        /**
+         * Factory methond for generate request associated with this miniFuture.
+         *
+         * @param rootFutId Root future id.
+         * @return Near get request.
+         */
+        public GridNearGetRequest createGetRequest(IgniteUuid rootFutId) {
+            return createGetRequest0(rootFutId, futureId());
+        }
+
+        /**
+         * @param rootFutId Root future id.
+         * @param futId Mini future id.
+         * @return Near get request.
+         */
+        protected abstract GridNearGetRequest createGetRequest0(IgniteUuid rootFutId, IgniteUuid futId);
+
+        /**
+         * @param entries Collection of entries.
+         * @return Map with key value results.
+         */
+        protected abstract Map<K, V> createResultMap(Collection<GridCacheEntryInfo> entries);
+
+        /**
+         * @param e Error.
+         */
+        public void onResult(Throwable e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+            // Fail.
+            onDone(e);
+        }
+
+        /**
+         * @param e Failure exception.
+         */
+       public synchronized void onNodeLeft(ClusterTopologyCheckedException e) {
+            if (remapped)
+                return;
+
+            remapped = true;
+
+            if (log.isDebugEnabled())
+                log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
+
+            // Try getting from existing nodes.
+            if (!canRemap) {
+                map(keys.keySet(), F.t(node, keys), topVer);
+
+                onDone(Collections.<K, V>emptyMap());
+            }
+            else {
+                long maxTopVer = Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion());
+
+                AffinityTopologyVersion awaitTopVer = new AffinityTopologyVersion(maxTopVer);
+
+                cctx.shared().exchange()
+                    .affinityReadyFuture(awaitTopVer)
+                    .listen((f) -> {
+                            try {
+                                // Remap.
+                                map(keys.keySet(), F.t(node, keys), f.get());
+
+                                onDone(Collections.<K, V>emptyMap());
+                            }
+                            catch (IgniteCheckedException ex) {
+                                CacheDistributedGetFutureAdapter.this.onDone(ex);
+                            }
+                        }
+                    );
+            }
+        }
+
+        /**
+         * @param res Result callback.
+         */
+        public void onResult(GridNearGetResponse res) {
+            // If error happened on remote node, fail the whole future.
+            if (res.error() != null) {
+                onDone(res.error());
+
+                return;
+            }
+
+            Collection<Integer> invalidParts = res.invalidPartitions();
+
+            // Remap invalid partitions.
+            if (!F.isEmpty(invalidParts)) {
+                AffinityTopologyVersion rmtTopVer = res.topologyVersion();
+
+                for (Integer part : invalidParts)
+                    addNodeAsInvalid(node, part, topVer);
+
+                if (log.isDebugEnabled())
+                    log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
+
+                if (!canRemap) {
+                    map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
+                        @Override public boolean apply(KeyCacheObject key) {
+                            return invalidParts.contains(cctx.affinity().partition(key));
+                        }
+                    }), F.t(node, keys), topVer);
+
+                    postProcessResult(res);
+
+                    onDone(createResultMap(res.entries()));
+
+                    return;
+                }
+
+                // Remap after remote version will be finished localy.
+                cctx.shared().exchange().affinityReadyFuture(rmtTopVer)
+                    .listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void applyx(
+                            IgniteInternalFuture<AffinityTopologyVersion> fut
+                        ) throws IgniteCheckedException {
+                            AffinityTopologyVersion topVer = fut.get();
+
+                            // This will append new futures to compound list.
+                            map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
+                                @Override public boolean apply(KeyCacheObject key) {
+                                    return invalidParts.contains(cctx.affinity().partition(key));
+                                }
+                            }), F.t(node, keys), topVer);
+
+                            postProcessResult(res);
+
+                            onDone(createResultMap(res.entries()));
+                        }
+                    });
+            }
+            else {
+                try {
+                    postProcessResult(res);
+
+                    onDone(createResultMap(res.entries()));
+                }
+                catch (Exception e) {
+                    onDone(e);
+                }
+            }
+        }
+
+        /**
+         * @param res Response.
+         */
+        protected void postProcessResult(final GridNearGetResponse res) {
+            if (postProcessingClos != null)
+                postProcessingClos.apply(res.entries());
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AbstractMiniFuture.class, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 17bf4ca..611c87c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -969,12 +969,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                                 res0 = info.value();
                         }
 
-                        res = new GridNearSingleGetResponse(ctx.cacheId(),
+                        res = new GridNearSingleGetResponse(
+                            ctx.cacheId(),
                             req.futureId(),
                             null,
                             res0,
                             false,
-                            req.addDeploymentInfo());
+                            req.addDeploymentInfo()
+                        );
 
                         if (info != null && req.skipValues())
                             res.setContainsValue();
@@ -982,15 +984,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                     else {
                         AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().initialVersion();
 
-                        assert topVer.compareTo(req.topologyVersion()) > 0 : "Wrong ready topology version for " +
-                            "invalid partitions response [topVer=" + topVer + ", req=" + req + ']';
-
-                        res = new GridNearSingleGetResponse(ctx.cacheId(),
+                        res = new GridNearSingleGetResponse(
+                            ctx.cacheId(),
                             req.futureId(),
                             topVer,
                             null,
                             true,
-                            req.addDeploymentInfo());
+                            req.addDeploymentInfo()
+                        );
                     }
                 }
                 catch (NodeStoppingException ignored) {
@@ -1075,8 +1076,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                     res.error(e);
                 }
 
-                if (!F.isEmpty(fut.invalidPartitions()))
-                    res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().initialVersion());
+                if (!F.isEmpty(fut.invalidPartitions())){
+                    AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().initialVersion();
+
+                    res.invalidPartitions(fut.invalidPartitions(), topVer);
+                }
 
                 try {
                     ctx.io().send(nodeId, res, ctx.ioPolicy());

http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index e0fe8be..8588084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -213,7 +213,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
      *
      */
     private void map() {
-        // TODO get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251
+        // TODO Get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251.
         if (cctx.group().preloader().needForceKeys()) {
             GridDhtFuture<Object> fut = cctx.group().preloader().request(
                 cctx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 2fcd677..0629754 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -23,10 +23,9 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -43,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
@@ -52,17 +50,10 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -71,17 +62,9 @@ import org.jetbrains.annotations.Nullable;
  */
 public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V>
     implements MvccSnapshotResponseListener {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
-    /** Logger. */
-    private static IgniteLogger log;
 
     /** Transaction label. */
-    private String txLbl;
+    protected final String txLbl;
 
     /** */
     protected final MvccSnapshot mvccSnapshot;
@@ -122,7 +105,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         @Nullable String txLbl,
         @Nullable MvccSnapshot mvccSnapshot
     ) {
-        super(cctx,
+        super(
+            cctx,
             keys,
             readThrough,
             forcePrimary,
@@ -133,15 +117,16 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             skipVals,
             needVer,
             keepCacheObjects,
-            recovery);
+            recovery
+        );
+
         assert mvccSnapshot == null || cctx.mvccEnabled();
 
         this.mvccSnapshot = mvccSnapshot;
 
         this.txLbl = txLbl;
 
-        if (log == null)
-            log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
+        initLogger(GridPartitionedGetFuture.class);
     }
 
     /**
@@ -169,14 +154,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     public void init(AffinityTopologyVersion topVer) {
         AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
 
+        // Can not remap if we in transaction and locked on some topology.
         if (lockedTopVer != null) {
             topVer = lockedTopVer;
 
             canRemap = false;
         }
-        else {
-            topVer = topVer.topologyVersion() > 0 ? topVer :
-                canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+        else{
+            // Use affinity topology version if constructor version is not specify.
+            topVer = topVer.topologyVersion() > 0 ? topVer : cctx.affinity().affinityTopologyVersion();
         }
 
         if (!cctx.mvccEnabled() || mvccSnapshot != null)
@@ -184,84 +170,33 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         else {
             mvccTracker = new MvccQueryTrackerImpl(cctx, canRemap);
 
-            trackable = true;
-
-            cctx.mvcc().addFuture(this, futId);
+            registrateFutureInMvccManager(this);
 
             mvccTracker.requestSnapshot(topVer, this);
         }
     }
 
-    @Override public void onResponse(MvccSnapshot res) {
-        AffinityTopologyVersion topVer = mvccTracker.topologyVersion();
-
-        assert topVer != null;
-
-        initialMap(topVer);
-    }
-
-    @Override public void onError(IgniteCheckedException e) {
-        onDone(e);
-    }
-
     /**
      * @param topVer Topology version.
      */
     private void initialMap(AffinityTopologyVersion topVer) {
-        map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+        map(keys, Collections.emptyMap(), topVer);
 
         markInitialized();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return trackable;
-    }
+    @Override public void onResponse(MvccSnapshot res) {
+        AffinityTopologyVersion topVer = mvccTracker.topologyVersion();
 
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        // Should not flip trackable flag from true to false since get future can be remapped.
-    }
+        assert topVer != null;
 
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futId;
+        initialMap(topVer);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        boolean found = false;
-
-        for (IgniteInternalFuture<Map<K, V>> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.node().id().equals(nodeId)) {
-                    found = true;
-
-                    f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid (will retry): " + nodeId));
-                }
-            }
-
-        return found;
-    }
-
-    /**
-     * @param nodeId Sender.
-     * @param res Result.
-     */
-    @Override public void onResult(UUID nodeId, GridNearGetResponse res) {
-        for (IgniteInternalFuture<Map<K, V>> fut : futures()) {
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.futureId().equals(res.miniId())) {
-                    assert f.node().id().equals(nodeId);
-
-                    f.onResult(res);
-                }
-            }
-        }
+    @Override public void onError(IgniteCheckedException e) {
+        onDone(e);
     }
 
     /** {@inheritDoc} */
@@ -270,6 +205,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             if (trackable)
                 cctx.mvcc().removeFuture(futId);
 
+            MvccQueryTracker mvccTracker = this.mvccTracker;
+
             if (mvccTracker != null)
                 mvccTracker.onDone();
 
@@ -281,11 +218,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         return false;
     }
 
-    /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteInternalFuture<?> f) {
+    /** {@inheritDoc} */
+    @Override protected boolean isMini(IgniteInternalFuture<?> f) {
         return f.getClass().equals(MiniFuture.class);
     }
 
@@ -294,68 +228,60 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
      * @param mapped Mappings to check for duplicates.
      * @param topVer Topology version on which keys should be mapped.
      */
-    private void map(
+    @Override protected void map(
         Collection<KeyCacheObject> keys,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
-        final AffinityTopologyVersion topVer
+        AffinityTopologyVersion topVer
     ) {
         Collection<ClusterNode> cacheNodes = CU.affinityNodes(cctx, topVer);
 
-        if (cacheNodes.isEmpty()) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
-
-            return;
-        }
-
-        GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
-
-        Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, null, keys) : null;
-
-        if (err != null) {
-            onDone(err);
+        validate(cacheNodes, topVer);
 
+        // Future can be already done with some exception.
+        if (isDone())
             return;
-        }
 
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(cacheNodes.size());
 
-        final int keysSize = keys.size();
+        int keysSize = keys.size();
 
+        // Map for local (key,value) pairs.
         Map<K, V> locVals = U.newHashMap(keysSize);
 
+        // True if we have remote nodes after key mapping complete.
         boolean hasRmtNodes = false;
 
-        // Assign keys to primary nodes.
+        // Assign keys to nodes.
         for (KeyCacheObject key : keys)
-            hasRmtNodes |= map(key, mappings, locVals, topVer, mapped);
+            hasRmtNodes |= map(key, topVer, mappings, mapped, locVals);
 
+        // Future can be alredy done with some exception.
         if (isDone())
             return;
 
+        // Add local read (key,value) in result.
         if (!locVals.isEmpty())
             add(new GridFinishedFuture<>(locVals));
 
-        if (hasRmtNodes) {
-            if (!trackable) {
-                trackable = true;
-
-                cctx.mvcc().addFuture(this, futId);
-            }
-        }
+        // If we have remote nodes in mapping we should registrate future in mvcc manager.
+        if (hasRmtNodes)
+            registrateFutureInMvccManager(this);
 
-        // Create mini futures.
+        // Create mini futures after mapping to remote nodes.
         for (Map.Entry<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> entry : mappings.entrySet()) {
-            final ClusterNode n = entry.getKey();
+            // Node for request.
+            ClusterNode n = entry.getKey();
 
-            final LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = entry.getValue();
+            // Keys for request.
+            LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = entry.getValue();
 
             assert !mappedKeys.isEmpty();
 
             // If this is the primary or backup node for the keys.
             if (n.isLocal()) {
-                final GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
-                    cache().getDhtAsync(n.id(),
+                GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cache()
+                    .getDhtAsync(
+                        n.id(),
                         -1,
                         mappedKeys,
                         false,
@@ -367,68 +293,50 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                         skipVals,
                         recovery,
                         txLbl,
-                        mvccSnapshot());
+                        mvccSnapshot()
+                    );
 
-                final Collection<Integer> invalidParts = fut.invalidPartitions();
+                Collection<Integer> invalidParts = fut.invalidPartitions();
 
                 if (!F.isEmpty(invalidParts)) {
                     Collection<KeyCacheObject> remapKeys = new ArrayList<>(keysSize);
 
                     for (KeyCacheObject key : keys) {
-                        if (key != null && invalidParts.contains(cctx.affinity().partition(key)))
+                        int part = cctx.affinity().partition(key);
+
+                        if (key != null && invalidParts.contains(part)) {
+                            addNodeAsInvalid(n, part, topVer);
+
                             remapKeys.add(key);
+                        }
                     }
 
                     AffinityTopologyVersion updTopVer = cctx.shared().exchange().readyAffinityVersion();
 
-                    assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
-                        "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
-                        ", invalidParts=" + invalidParts + ']';
-
                     // Remap recursively.
                     map(remapKeys, mappings, updTopVer);
                 }
 
                 // Add new future.
-                add(fut.chain(new C1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>, Map<K, V>>() {
-                    @Override public Map<K, V> apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) {
-                        try {
-                            return createResultMap(fut.get());
-                        }
-                        catch (Exception e) {
-                            U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
+                add(fut.chain(f -> {
+                    try {
+                        return createResultMap(f.get());
+                    }
+                    catch (Exception e) {
+                        U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
 
-                            onDone(e);
+                        onDone(e);
 
-                            return Collections.emptyMap();
-                        }
+                        return Collections.emptyMap();
                     }
                 }));
             }
             else {
-                MiniFuture fut = new MiniFuture(n, mappedKeys, topVer,
-                    CU.createBackupPostProcessingClosure(topVer, log, cctx, null, expiryPlc, readThrough, skipVals));
-
-                GridCacheMessage req = new GridNearGetRequest(
-                    cctx.cacheId(),
-                    futId,
-                    fut.futureId(),
-                    null,
-                    mappedKeys,
-                    readThrough,
-                    topVer,
-                    subjId,
-                    taskName == null ? 0 : taskName.hashCode(),
-                    expiryPlc != null ? expiryPlc.forCreate() : -1L,
-                    expiryPlc != null ? expiryPlc.forAccess() : -1L,
-                    false,
-                    skipVals,
-                    cctx.deploymentEnabled(),
-                    recovery,
-                    txLbl,
-                    mvccSnapshot());
+                MiniFuture miniFut = new MiniFuture(n, mappedKeys, topVer);
 
-                add(fut); // Append new future.
+                GridCacheMessage req = miniFut.createGetRequest(futId);
+
+                add(miniFut); // Append new future.
 
                 try {
                     cctx.io().send(n, req, cctx.ioPolicy());
@@ -436,83 +344,118 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 catch (IgniteCheckedException e) {
                     // Fail the whole thing.
                     if (e instanceof ClusterTopologyCheckedException)
-                        fut.onNodeLeft((ClusterTopologyCheckedException)e);
+                        miniFut.onNodeLeft((ClusterTopologyCheckedException)e);
                     else
-                        fut.onResult(e);
+                        miniFut.onResult(e);
                 }
             }
         }
     }
 
     /**
-     * @param mappings Mappings.
+     * @param nodesToKeysMapping Mappings.
      * @param key Key to map.
      * @param locVals Local values.
      * @param topVer Topology version.
-     * @param mapped Previously mapped.
+     * @param missedNodesToKeysMapping Previously mapped.
      * @return {@code True} if has remote nodes.
      */
     private boolean map(
         KeyCacheObject key,
-        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
-        Map<K, V> locVals,
         AffinityTopologyVersion topVer,
-        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> nodesToKeysMapping,
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> missedNodesToKeysMapping,
+        Map<K, V> locVals
     ) {
         int part = cctx.affinity().partition(key);
 
         List<ClusterNode> affNodes = cctx.affinity().nodesByPartition(part, topVer);
 
+        // Failed if none affinity node found.
         if (affNodes.isEmpty()) {
             onDone(serverNotFoundError(part, topVer));
 
             return false;
         }
 
-        // Local get cannot be used with MVCC as local node can contain some visible version which is not latest.
-        boolean fastLocGet = !cctx.mvccEnabled() && (!forcePrimary || affNodes.get(0).isLocal()) &&
-            cctx.reserveForFastLocalGet(part, topVer);
+        // Try to read key localy if we can.
+        if (tryLocalGet(key, part, topVer, affNodes, locVals))
+            return false;
 
-        if (fastLocGet) {
-            try {
-                if (localGet(topVer, key, part, locVals))
-                    return false;
-            }
-            finally {
-                cctx.releaseForFastLocalGet(part, topVer);
-            }
-        }
+        Set<ClusterNode> invalidNodeSet = getInvalidNodes(part, topVer);
 
-        ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap);
+        // Get remote node for request for this key.
+        ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, invalidNodeSet, part, canRemap);
 
+        // Failed if none remote node found.
         if (node == null) {
             onDone(serverNotFoundError(part, topVer));
 
             return false;
         }
 
+        // The node still can be local, see details implementation of #tryLocalGet().
         boolean remote = !node.isLocal();
 
-        LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+        // Check retry counter, bound for avoid inifinit remap.
+        if (!checkRetryPermits(key, node, missedNodesToKeysMapping))
+            return false;
 
-        if (keys != null && keys.containsKey(key)) {
-            if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
-                onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
-                    MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
-                    U.toShortString(node) + ", mappings=" + mapped + ']'));
+        addNodeMapping(key, node, nodesToKeysMapping);
 
-                return false;
-            }
-        }
+        return remote;
+    }
 
+    /**
+     *
+     * @param key Key.
+     * @param node Mapped node.
+     * @param mappings Full node mapping.
+     */
+    private void addNodeMapping(
+        KeyCacheObject key,
+        ClusterNode node,
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings
+    ) {
         LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
 
         if (old == null)
             mappings.put(node, old = new LinkedHashMap<>(3, 1f));
 
         old.put(key, false);
+    }
 
-        return remote;
+    /**
+     *
+     * @param key Key.
+     * @param part Partition.
+     * @param topVer Topology version.
+     * @param affNodes Affynity nodes.
+     * @param locVals Map for local (key,value) pairs.
+     */
+    private boolean tryLocalGet(
+        KeyCacheObject key,
+        int part,
+        AffinityTopologyVersion topVer,
+        List<ClusterNode> affNodes,
+        Map<K, V> locVals
+    ) {
+        // Local get cannot be used with MVCC as local node can contain some visible version which is not latest.
+        boolean fastLocGet = !cctx.mvccEnabled() &&
+            (!forcePrimary || affNodes.get(0).isLocal()) &&
+            cctx.reserveForFastLocalGet(part, topVer);
+
+        if (fastLocGet) {
+            try {
+                if (localGet(topVer, key, part, locVals))
+                    return true;
+            }
+            finally {
+                cctx.releaseForFastLocalGet(part, topVer);
+            }
+        }
+
+        return false;
     }
 
     /**
@@ -662,6 +605,27 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     }
 
     /**
+     *
+     * @param cacheNodes Cache affynity nodes.
+     * @param topVer Topology version.
+     */
+    private void validate(Collection<ClusterNode> cacheNodes, AffinityTopologyVersion topVer) {
+        if (cacheNodes.isEmpty()) {
+            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+
+            return;
+        }
+
+        GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
+
+        Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, null, keys) : null;
+
+        if (err != null)
+            onDone(err);
+    }
+
+    /**
      * @return Near cache.
      */
     private GridDhtCacheAdapter<K, V> cache() {
@@ -701,21 +665,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
-            @SuppressWarnings("unchecked")
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                if (isMini(f)) {
-                    return "[node=" + ((MiniFuture)f).node().id() +
-                        ", loc=" + ((MiniFuture)f).node().isLocal() +
-                        ", done=" + f.isDone() + "]";
-                }
-                else
-                    return "[loc=true, done=" + f.isDone() + "]";
-            }
-        });
-
         return S.toString(GridPartitionedGetFuture.class, this,
-            "innerFuts", futs,
             "super", super.toString());
     }
 
@@ -723,197 +673,46 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
      * Mini-future for get operations. Mini-futures are only waiting on a single
      * node as opposed to multiple nodes.
      */
-    private class MiniFuture extends GridFutureAdapter<Map<K, V>> {
-        /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
-        /** Node ID. */
-        private final ClusterNode node;
-
-        /** Keys. */
-        @GridToStringInclude
-        private final LinkedHashMap<KeyCacheObject, Boolean> keys;
-
-        /** Topology version on which this future was mapped. */
-        private final AffinityTopologyVersion topVer;
-
-        /** Post processing closure. */
-        private final IgniteInClosure<Collection<GridCacheEntryInfo>> postProcessingClos;
-
-        /** {@code True} if remapped after node left. */
-        private boolean remapped;
-
+    private class MiniFuture extends AbstractMiniFuture {
         /**
          * @param node Node.
          * @param keys Keys.
          * @param topVer Topology version.
-         * @param postProcessingClos Post processing closure.
          */
-        MiniFuture(ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, AffinityTopologyVersion topVer,
-            @Nullable IgniteInClosure<Collection<GridCacheEntryInfo>> postProcessingClos) {
-            this.node = node;
-            this.keys = keys;
-            this.topVer = topVer;
-            this.postProcessingClos = postProcessingClos;
+        public MiniFuture(
+            ClusterNode node,
+            LinkedHashMap<KeyCacheObject, Boolean> keys,
+            AffinityTopologyVersion topVer
+        ) {
+            super(node, keys, topVer);
         }
 
-        /**
-         * @return Future ID.
-         */
-        IgniteUuid futureId() {
-            return futId;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        public ClusterNode node() {
-            return node;
-        }
-
-        /**
-         * @return Keys.
-         */
-        public Collection<KeyCacheObject> keys() {
-            return keys.keySet();
-        }
-
-        /**
-         * @param e Error.
-         */
-        void onResult(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
-            // Fail.
-            onDone(e);
-        }
-
-        /**
-         * @param e Failure exception.
-         */
-        synchronized void onNodeLeft(ClusterTopologyCheckedException e) {
-            if (remapped)
-                return;
-
-            remapped = true;
-
-            if (log.isDebugEnabled())
-                log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
-
-            // Try getting from existing nodes.
-            if (!canRemap) {
-                map(keys.keySet(), F.t(node, keys), topVer);
-
-                onDone(Collections.<K, V>emptyMap());
-            }
-            else {
-                AffinityTopologyVersion updTopVer =
-                    new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
-
-                cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(
-                    new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                            try {
-                                // Remap.
-                                map(keys.keySet(), F.t(node, keys), fut.get());
-
-                                onDone(Collections.<K, V>emptyMap());
-                            }
-                            catch (IgniteCheckedException e) {
-                                GridPartitionedGetFuture.this.onDone(e);
-                            }
-                        }
-                    }
-                );
-            }
-        }
-
-        /**
-         * @param res Result callback.
-         */
-        void onResult(final GridNearGetResponse res) {
-            final Collection<Integer> invalidParts = res.invalidPartitions();
-
-            // If error happened on remote node, fail the whole future.
-            if (res.error() != null) {
-                onDone(res.error());
-
-                return;
-            }
-
-            // Remap invalid partitions.
-            if (!F.isEmpty(invalidParts)) {
-                AffinityTopologyVersion rmtTopVer = res.topologyVersion();
-
-                assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO);
-
-                if (rmtTopVer.compareTo(topVer) <= 0) {
-                    // Fail the whole get future.
-                    onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
-                        "invalid partitions but remote topology version does not differ from local) " +
-                        "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", invalidParts=" + invalidParts +
-                        ", nodeId=" + node.id() + ']'));
-
-                    return;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
-
-                if (!canRemap) {
-                    map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
-                        @Override public boolean apply(KeyCacheObject key) {
-                            return invalidParts.contains(cctx.affinity().partition(key));
-                        }
-                    }), F.t(node, keys), topVer);
-
-                    postProcessResult(res);
-
-                    onDone(createResultMap(res.entries()));
-
-                    return;
-                }
-
-                // Need to wait for next topology version to remap.
-                IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.shared().exchange().affinityReadyFuture(rmtTopVer);
-
-                topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void applyx(
-                        IgniteInternalFuture<AffinityTopologyVersion> fut) throws IgniteCheckedException {
-                        AffinityTopologyVersion topVer = fut.get();
-
-                        // This will append new futures to compound list.
-                        map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
-                            @Override public boolean apply(KeyCacheObject key) {
-                                return invalidParts.contains(cctx.affinity().partition(key));
-                            }
-                        }), F.t(node, keys), topVer);
-
-                        postProcessResult(res);
-
-                        onDone(createResultMap(res.entries()));
-                    }
-                });
-            }
-            else {
-                try {
-                    postProcessResult(res);
-
-                    onDone(createResultMap(res.entries()));
-                }
-                catch (Exception e) {
-                    onDone(e);
-                }
-            }
+        /** {@inheritDoc} */
+        @Override protected GridNearGetRequest createGetRequest0(IgniteUuid rootFutId, IgniteUuid futId) {
+            return new GridNearGetRequest(
+                cctx.cacheId(),
+                rootFutId,
+                futId,
+                null,
+                keys,
+                readThrough,
+                topVer,
+                subjId,
+                taskName == null ? 0 : taskName.hashCode(),
+                expiryPlc != null ? expiryPlc.forCreate() : -1L,
+                expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                false,
+                skipVals,
+                cctx.deploymentEnabled(),
+                recovery,
+                txLbl,
+                mvccSnapshot()
+            );
         }
 
-        /**
-         * @param res Response.
-         */
-        private void postProcessResult(final GridNearGetResponse res) {
-            if (postProcessingClos != null)
-                postProcessingClos.apply(res.entries());
+        /** {@inheritDoc} */
+        @Override protected Map<K, V> createResultMap(Collection<GridCacheEntryInfo> entries) {
+            return GridPartitionedGetFuture.this.createResultMap(entries);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 4d0e129..0d69485 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -18,8 +18,12 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -51,8 +55,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -61,13 +63,25 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 
 /**
  *
  */
-public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object> implements GridCacheFuture<Object>,
-    CacheGetFuture, IgniteDiagnosticAware {
+public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object>
+    implements CacheGetFuture, IgniteDiagnosticAware {
+    /** Default max remap count value. */
+    public static final int DFLT_MAX_REMAP_CNT = 3;
+
+    /** Maximum number of attempts to remap key to the same primary node. */
+    protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
+
+    /** Remap count updater. */
+    protected static final AtomicIntegerFieldUpdater<GridPartitionedSingleGetFuture> REMAP_CNT_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridPartitionedSingleGetFuture.class, "remapCnt");
+
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
@@ -133,7 +147,13 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     private volatile BackupPostProcessingClosure postProcessingClos;
 
     /** Transaction label. */
-    private String txLbl;
+    private final String txLbl;
+
+    /** Invalid mappened nodes. */
+    private Set<ClusterNode> invalidNodes = Collections.emptySet();
+
+    /** Remap count. */
+    protected volatile int remapCnt;
 
     /**
      * @param cctx Context.
@@ -204,30 +224,30 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     }
 
     /**
-     *
+     * Initialize future.
      */
     public void init() {
-        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
-            canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
-
-        GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
-
-        Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, key, null) : null;
+        AffinityTopologyVersion mappingtopVermappingtopVer;
 
-        if (err != null) {
-            onDone(err);
-
-            return;
+        if (topVer.topologyVersion() > 0)
+            mappingtopVermappingtopVer = topVer;
+        else {
+            mappingtopVermappingtopVer = canRemap ?
+                cctx.affinity().affinityTopologyVersion() :
+                cctx.shared().exchange().readyAffinityVersion();
         }
 
-        map(topVer);
+        map(mappingtopVermappingtopVer);
     }
 
     /**
      * @param topVer Topology version.
      */
     @SuppressWarnings("unchecked")
-    private void map(final AffinityTopologyVersion topVer) {
+    private void map(AffinityTopologyVersion topVer) {
+        if (!validate(topVer))
+            return;
+
         ClusterNode node = mapKeyToNode(topVer);
 
         if (node == null) {
@@ -239,46 +259,46 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
         if (isDone())
             return;
 
+        // Read value if node is localNode.
         if (node.isLocal()) {
-            final GridDhtFuture<GridCacheEntryInfo> fut = cctx.dht().getDhtSingleAsync(node.id(),
-                -1,
-                key,
-                false,
-                readThrough,
-                topVer,
-                subjId,
-                taskName == null ? 0 : taskName.hashCode(),
-                expiryPlc,
-                skipVals,
-                recovery,
-                txLbl,
-                mvccSnapshot);
-
-            final Collection<Integer> invalidParts = fut.invalidPartitions();
+            GridDhtFuture<GridCacheEntryInfo> fut = cctx.dht()
+                .getDhtSingleAsync(
+                    node.id(),
+                    -1,
+                    key,
+                    false,
+                    readThrough,
+                    topVer,
+                    subjId,
+                    taskName == null ? 0 : taskName.hashCode(),
+                    expiryPlc,
+                    skipVals,
+                    recovery,
+                    txLbl,
+                    mvccSnapshot
+                );
+
+            Collection<Integer> invalidParts = fut.invalidPartitions();
 
             if (!F.isEmpty(invalidParts)) {
-                AffinityTopologyVersion updTopVer = cctx.shared().exchange().readyAffinityVersion();
+                addNodeAsInvalid(node);
 
-                assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology " +
-                    "version did not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
-                    ", invalidParts=" + invalidParts + ']';
+                AffinityTopologyVersion updTopVer = cctx.shared().exchange().readyAffinityVersion();
 
                 // Remap recursively.
                 map(updTopVer);
             }
             else {
-                fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
-                    @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> fut) {
-                        try {
-                            GridCacheEntryInfo info = fut.get();
+                fut.listen(f -> {
+                    try {
+                        GridCacheEntryInfo info = f.get();
 
-                            setResult(info);
-                        }
-                        catch (Exception e) {
-                            U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
+                        setResult(info);
+                    }
+                    catch (Exception e) {
+                        U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
 
-                            onDone(e);
-                        }
+                        onDone(e);
                     }
                 });
             }
@@ -291,15 +311,11 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 this.node = node;
             }
 
-            if (!trackable) {
-                trackable = true;
-
-                cctx.mvcc().addFuture(this, futId);
-            }
+            registrateFutureInMvccManager(this);
 
             boolean needVer = this.needVer;
 
-            final BackupPostProcessingClosure postClos = CU.createBackupPostProcessingClosure(topVer, log,
+            BackupPostProcessingClosure postClos = CU.createBackupPostProcessingClosure(topVer, log,
                 cctx, key, expiryPlc, readThrough, skipVals);
 
             if (postClos != null) {
@@ -309,7 +325,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 postProcessingClos = postClos;
             }
 
-            GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),
+            GridCacheMessage req = new GridNearSingleGetRequest(
+                cctx.cacheId(),
                 futId.localId(),
                 key,
                 readThrough,
@@ -324,7 +341,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 cctx.deploymentEnabled(),
                 recovery,
                 txLbl,
-                mvccSnapshot);
+                mvccSnapshot
+            );
 
             try {
                 cctx.io().send(node, req, cctx.ioPolicy());
@@ -347,35 +365,58 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
 
         List<ClusterNode> affNodes = cctx.affinity().nodesByPartition(part, topVer);
 
+        // Failed if none affinity node found by assigment.
         if (affNodes.isEmpty()) {
             onDone(serverNotFoundError(part, topVer));
 
             return null;
         }
 
+        // Try to read key localy if we can.
+        if (tryLocalGet(key, part, topVer, affNodes))
+            return null;
+
+        ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, getInvalidNodes(), part, canRemap);
+
+        // Failed if none balanced node found.
+        if (affNode == null) {
+            onDone(serverNotFoundError(part, topVer));
+
+            return null;
+        }
+
+        return affNode;
+    }
+
+    /**
+     *
+     * @param key Key.
+     * @param part Partition.
+     * @param topVer Topology version.
+     * @param affNodes Affynity nodes.
+     */
+    private boolean tryLocalGet(
+        KeyCacheObject key,
+        int part,
+        AffinityTopologyVersion topVer,
+        List<ClusterNode> affNodes
+    ) {
         // Local get cannot be used with MVCC as local node can contain some visible version which is not latest.
-        boolean fastLocGet = !cctx.mvccEnabled() && (!forcePrimary || affNodes.get(0).isLocal()) &&
+        boolean fastLocGet = !cctx.mvccEnabled() &&
+            (!forcePrimary || affNodes.get(0).isLocal()) &&
             cctx.reserveForFastLocalGet(part, topVer);
 
         if (fastLocGet) {
             try {
-                if (localGet(topVer, part))
-                    return null;
+                if (localGet(topVer, key, part))
+                    return true;
             }
             finally {
                 cctx.releaseForFastLocalGet(part, topVer);
             }
         }
 
-        ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap);
-
-        if (affNode == null) {
-            onDone(serverNotFoundError(part, topVer));
-
-            return null;
-        }
-
-        return affNode;
+        return false;
     }
 
     /**
@@ -383,7 +424,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
      * @param part Partition.
      * @return {@code True} if future completed.
      */
-    private boolean localGet(AffinityTopologyVersion topVer, int part) {
+    private boolean localGet(AffinityTopologyVersion topVer, KeyCacheObject key, int part) {
         assert cctx.affinityNode() : this;
 
         GridDhtCacheAdapter colocated = cctx.dht();
@@ -522,12 +563,28 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     }
 
     /**
+     * @param fut Future.
+     */
+    private void registrateFutureInMvccManager(GridCacheFuture<?> fut) {
+        if (!trackable) {
+            trackable = true;
+
+            cctx.mvcc().addFuture(fut, futId);
+        }
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param res Result.
      */
     public void onResult(UUID nodeId, GridNearSingleGetResponse res) {
-        if (!processResponse(nodeId) ||
-            !checkError(res.error(), res.invalidPartitions(), res.topologyVersion(), nodeId))
+        // Brake here if response from unexpected node.
+        if (!processResponse(nodeId))
+            return;
+
+        // Brake here if exception was throws on remote node or
+        // parition on remote node is invalid.
+        if (!checkError(nodeId, res.invalidPartitions(), res.topologyVersion(), res.error()))
             return;
 
         Message res0 = res.result();
@@ -562,13 +619,15 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
         }
     }
 
-    /**
-     * @param nodeId Node ID.
-     * @param res Result.
-     */
+    /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridNearGetResponse res) {
-        if (!processResponse(nodeId) ||
-            !checkError(res.error(), !F.isEmpty(res.invalidPartitions()), res.topologyVersion(), nodeId))
+        // Brake here if response from unexpected node.
+        if (!processResponse(nodeId))
+            return;
+
+        // Brake here if exception was throws on remote node or
+        // parition on remote node is invalid.
+        if (!checkError(nodeId, !F.isEmpty(res.invalidPartitions()), res.topologyVersion(), res.error()))
             return;
 
         Collection<GridCacheEntryInfo> infos = res.entries();
@@ -601,10 +660,12 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
      * @param nodeId Node ID.
      * @return {@code True} if should process received response.
      */
-    private boolean checkError(@Nullable IgniteCheckedException err,
+    private boolean checkError(
+        UUID nodeId,
         boolean invalidParts,
         AffinityTopologyVersion rmtTopVer,
-        UUID nodeId) {
+        @Nullable IgniteCheckedException err
+    ) {
         if (err != null) {
             onDone(err);
 
@@ -612,34 +673,10 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
         }
 
         if (invalidParts) {
-            assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO);
-
-            if (rmtTopVer.compareTo(topVer) <= 0) {
-                // Fail the whole get future.
-                onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
-                    "invalid partitions but remote topology version does not differ from local) " +
-                    "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", part=" + cctx.affinity().partition(key) +
-                    ", nodeId=" + nodeId + ']'));
-
-                return false;
-            }
+            addNodeAsInvalid(cctx.node(nodeId));
 
             if (canRemap) {
-                IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.shared().exchange().affinityReadyFuture(rmtTopVer);
-
-                topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void applyx(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                        try {
-                            AffinityTopologyVersion topVer = fut.get();
-
-                            remap(topVer);
-                        }
-                        catch (IgniteCheckedException e) {
-                            onDone(e);
-                        }
-                    }
-                });
-
+                awaitVersionAndRemap(rmtTopVer);
             }
             else
                 map(topVer);
@@ -728,6 +765,43 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     }
 
     /**
+     * @param node Invalid node.
+     */
+    private synchronized void addNodeAsInvalid(ClusterNode node) {
+        if (invalidNodes == Collections.<ClusterNode>emptySet())
+            invalidNodes = new HashSet<>();
+
+        invalidNodes.add(node);
+    }
+
+    /**
+     * @return Set of invalid cluster nodes.
+     */
+    protected synchronized Set<ClusterNode> getInvalidNodes() {
+        return invalidNodes;
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    private boolean checkRetryPermits(AffinityTopologyVersion topVer) {
+        if (topVer.equals(this.topVer))
+            return true;
+
+        if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
+            ClusterNode node0 = node;
+
+            onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+                MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+                (node0 != null ? U.toShortString(node0) : node0) + ", invalidNodes=" + invalidNodes + ']'));
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param part Partition.
      * @param topVer Topology version.
      * @return Exception.
@@ -737,6 +811,27 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
             "(all partition nodes left the grid) [topVer=" + topVer + ", part=" + part + ", cache=" + cctx.name() + ']');
     }
 
+    /**
+     * @param topVer Topology version.
+     * @return True if validate success, False is not.
+     */
+    private boolean validate(AffinityTopologyVersion topVer) {
+        if (!checkRetryPermits(topVer))
+            return false;
+
+        GridDhtTopologyFuture lastFut = cctx.shared().exchange().lastFinishedFuture();
+
+        Throwable error = lastFut.validateCache(cctx, recovery, true, key, null);
+
+        if (error != null) {
+            onDone(error);
+
+            return false;
+        }
+        else
+            return true;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;
@@ -748,20 +843,9 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
             return false;
 
         if (canRemap) {
-            AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(
-                Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
-
-            cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(
-                new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                        try {
-                            remap(fut.get());
-                        }
-                        catch (IgniteCheckedException e) {
-                            onDone(e);
-                        }
-                    }
-                });
+            long maxTopVer = Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion());
+
+            awaitVersionAndRemap(new AffinityTopologyVersion(maxTopVer));
         }
         else
             remap(topVer);
@@ -772,15 +856,30 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     /**
      * @param topVer Topology version.
      */
+    private void awaitVersionAndRemap(AffinityTopologyVersion topVer){
+        IgniteInternalFuture<AffinityTopologyVersion> awaitTopologyVersionFuture =
+            cctx.shared().exchange().affinityReadyFuture(topVer);
+
+        awaitTopologyVersionFuture.listen(f -> {
+            try {
+                remap(f.get());
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
+            }
+        });
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
     private void remap(final AffinityTopologyVersion topVer) {
         cctx.closures().runLocalSafe(new Runnable() {
             @Override public void run() {
-                GridDhtTopologyFuture lastFut = cctx.shared().exchange().lastFinishedFuture();
-
-                Throwable error = lastFut.validateCache(cctx, recovery, true, key, null);
-
-                if (error != null)
-                    onDone(error);
+                // If topology changed reset collection of invalid nodes.
+                synchronized (this) {
+                    invalidNodes = Collections.emptySet();
+                }
 
                 map(topVer);
             }
@@ -834,6 +933,6 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridPartitionedSingleGetFuture.class, this);
+        return S.toString(GridPartitionedSingleGetFuture.class, this, "super", super.toString());
     }
 }