You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/12 15:45:04 UTC

ignite git commit: ignite-1811 Optimized cache 'get' on affinity node.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1811 [created] 8f1f6626a


ignite-1811 Optimized cache 'get' on affinity node.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8f1f6626
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8f1f6626
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8f1f6626

Branch: refs/heads/ignite-1811
Commit: 8f1f6626a9dd413e55f1055d32c5335f1069eb82
Parents: 10012b4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jan 12 17:44:54 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jan 12 17:44:54 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |  30 +++
 .../dht/CacheDistributedGetFutureAdapter.java   |  28 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  27 +-
 .../dht/GridPartitionedGetFuture.java           | 234 +++++++++--------
 .../dht/GridPartitionedSingleGetFuture.java     | 219 ++++++++-------
 .../distributed/near/GridNearGetFuture.java     | 263 +++++++++++--------
 6 files changed, 474 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index c10ebf3..1b9e081 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -111,6 +111,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
  * Cache context.
@@ -1961,6 +1962,35 @@ public class GridCacheContext<K, V> implements Externalizable {
         });
     }
 
+    /**
+     * @param part Partition.
+     * @param affNodes Affinity nodes.
+     * @param topVer Topology version.
+     * @return {@code True} if cache 'get' operation is allowed to get entry locally.
+     */
+    public boolean allowFastLocalRead(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+        return affinityNode() && hasPartition(part, affNodes, topVer);
+    }
+
+    /**
+     * @param part Partition.
+     * @param affNodes Affinity nodes.
+     * @param topVer Topology version.
+     * @return {@code True} if partition is available locally.
+     */
+    private boolean hasPartition(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+        return topology().rebalanceFinished(topVer) && (isReplicated() || affNodes.contains(locNode))
+            || partitionOwned(part);
+    }
+
+    /**
+     * @param part Partition.
+     * @return {@code True} if partition is in owned state.
+     */
+    private boolean partitionOwned(int part) {
+        return topology().partitionState(localNodeId(), part) == OWNING;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, gridName());

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index c43cce9..40eec63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
@@ -39,6 +40,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
  *
@@ -168,14 +170,11 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
     /**
      * Affinity node to send get request to.
      *
-     * @param key Key to get.
-     * @param topVer Topology version.
+     * @param affNodes All affinity nodes.
      * @return Affinity node to get key from.
      */
-    protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+    protected final ClusterNode affinityNode(List<ClusterNode> affNodes) {
         if (!canRemap) {
-            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
             for (ClusterNode node : affNodes) {
                 if (cctx.discovery().alive(node))
                     return node;
@@ -184,6 +183,23 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
             return null;
         }
         else
-            return cctx.affinity().primary(key, topVer);
+            return affNodes.get(0);
+    }
+
+    /**
+     * @param part Partition.
+     * @return {@code True} if partition is in owned state.
+     */
+    protected final boolean partitionOwned(int part) {
+        return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Exception.
+     */
+    protected final ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+        return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+            "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a0709c5..2ab8a12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -88,7 +88,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private GridDhtPartitionExchangeId lastExchangeId;
 
     /** */
-    private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+    private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
     /** */
     private volatile boolean stopping;
@@ -136,9 +136,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             topReadyFut = null;
 
-            topVer = AffinityTopologyVersion.NONE;
-
             rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+            topVer = AffinityTopologyVersion.NONE;
         }
         finally {
             lock.writeLock().unlock();
@@ -223,13 +223,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             this.stopping = stopping;
 
-            topVer = exchId.topologyVersion();
-
             updateSeq.setIfGreater(updSeq);
 
             topReadyFut = exchFut;
 
             rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+            topVer = exchId.topologyVersion();
         }
         finally {
             lock.writeLock().unlock();
@@ -238,17 +238,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
-        lock.readLock().lock();
+        AffinityTopologyVersion topVer = this.topVer;
 
-        try {
-            assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
-                ", cacheName=" + cctx.name() + ']';
+        assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
+            ", cacheName=" + cctx.name() + ']';
 
-            return topVer;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
+        return topVer;
     }
 
     /** {@inheritDoc} */
@@ -1336,7 +1331,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
-        return topVer.equals(rebalancedTopVer);
+        AffinityTopologyVersion curTopVer = this.topVer;
+
+        return curTopVer.equals(topVer) && curTopVer.equals(rebalancedTopVer);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 19df1c2..9ed9dc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -234,15 +235,16 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
         AffinityTopologyVersion topVer
     ) {
-        if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+        Collection<ClusterNode> cacheNodes = CU.affinityNodes(cctx, topVer);
+
+        if (cacheNodes.isEmpty()) {
             onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                 "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
 
             return;
         }
 
-        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings =
-            U.newHashMap(CU.affinityNodes(cctx, topVer).size());
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(cacheNodes.size());
 
         final int keysSize = keys.size();
 
@@ -374,135 +376,151 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         AffinityTopologyVersion topVer,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped
     ) {
-        GridDhtCacheAdapter<K, V> colocated = cache();
+        int part = cctx.affinity().partition(key);
 
-        boolean remote = false;
+        List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
 
-        // Allow to get cached value from the local node.
-        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) ||
-                cctx.affinity().primary(cctx.localNode(), key, topVer);
+        if (affNodes.isEmpty()) {
+            onDone(serverNotFoundError(topVer));
 
-        while (true) {
-            GridCacheEntryEx entry;
+            return false;
+        }
 
-            try {
-                if (allowLocRead) {
-                    try {
-                        entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                            colocated.peekEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
-
-                            CacheObject v = null;
-                            GridCacheVersion ver = null;
-
-                            if (needVer) {
-                                T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-
-                                if (res != null) {
-                                    v = res.get1();
-                                    ver = res.get2();
-                                }
-                            }
-                            else {
-                                v = entry.innerGet(null,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    /*temporary*/false,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-                            }
+        boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+            cctx.allowFastLocalRead(part, affNodes, topVer);
 
-                            colocated.context().evicts().touch(entry, topVer);
+        if (fastLocGet && localGet(key, part, locVals))
+            return false;
 
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null) {
-                                if (isNew && entry.markObsoleteIfEmpty(ver))
-                                    colocated.removeIfObsolete(key);
-                            }
-                            else {
-                                if (needVer)
-                                    versionedResult(locVals, key, v, ver);
-                                else
-                                    cctx.addResult(locVals,
-                                        key,
-                                        v,
-                                        skipVals,
-                                        keepCacheObjects,
-                                        deserializeBinary,
-                                        true);
-
-                                return false;
-                            }
-                        }
-                    }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        // No-op.
-                    }
-                }
+        ClusterNode node = affinityNode(affNodes);
 
-                ClusterNode node = affinityNode(key, topVer);
+        if (node == null) {
+            onDone(serverNotFoundError(topVer));
 
-                if (node == null) {
-                    onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                        "(all partition nodes left the grid)."));
+            return false;
+        }
 
-                    return false;
-                }
+        boolean remote = !node.isLocal();
+
+        LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+
+        if (keys != null && keys.containsKey(key)) {
+            if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
+                onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+                    MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+                    U.toShortString(node) + ", mappings=" + mapped + ']'));
+
+                return false;
+            }
+        }
+
+        LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
 
-                remote = !node.isLocal();
+        if (old == null)
+            mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+
+        old.put(key, false);
+
+        return remote;
+    }
 
-                LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+    /**
+     * @param key Key.
+     * @param part Partition.
+     * @param locVals Local values.
+     * @return {@code True} if there is no need to further search value.
+     */
+    private boolean localGet(KeyCacheObject key, int part, Map<K, V> locVals) {
+        GridDhtCacheAdapter<K, V> cache = cache();
 
-                if (keys != null && keys.containsKey(key)) {
-                    if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
-                        onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
-                            MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
-                            U.toShortString(node) + ", mappings=" + mapped + ']'));
+        while (true) {
+            GridCacheEntryEx entry;
 
-                        return false;
+            try {
+                entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+
+                // If our DHT cache do has value, then we peek it.
+                if (entry != null) {
+                    boolean isNew = entry.isNewLocked();
+
+                    CacheObject v = null;
+                    GridCacheVersion ver = null;
+
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
+                        }
                     }
-                }
+                    else {
+                        v = entry.innerGet(null,
+                            /*swap*/true,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+                    }
+
+                    cache.context().evicts().touch(entry, topVer);
 
-                LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
+                    // Entry was not in memory or in swap, so we remove it from cache.
+                    if (v == null) {
+                        if (isNew && entry.markObsoleteIfEmpty(ver))
+                            cache.removeIfObsolete(key);
+                    }
+                    else {
+                        if (needVer)
+                            versionedResult(locVals, key, v, ver);
+                        else {
+                            cctx.addResult(locVals,
+                                key,
+                                v,
+                                skipVals,
+                                keepCacheObjects,
+                                deserializeBinary,
+                                true);
+                        }
 
-                if (old == null)
-                    mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+                        return true;
+                    }
+                }
 
-                old.put(key, false);
+                boolean topStable = topVer.equals(cctx.topology().topologyVersion());
 
-                break;
+                // Entry not found, do not continue search if topology did not change and there is no store.
+                return !cctx.store().configured() && (topStable || partitionOwned(part));
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op, will retry.
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                return false;
             }
             catch (IgniteCheckedException e) {
                 onDone(e);
 
-                break;
-            }
-            catch (GridCacheEntryRemovedException ignored) {
-                // No-op, will retry.
+                return true;
             }
         }
-
-        return remote;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 29971fd..ffd7f23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -58,6 +58,8 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
 /**
  *
  */
@@ -319,105 +321,149 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
      * @return Primary node or {@code null} if future was completed.
      */
     @Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) {
-        ClusterNode primary = affinityNode(key, topVer);
+        int part = cctx.affinity().partition(key);
+
+        List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
 
-        if (primary == null) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+        if (affNodes.isEmpty()) {
+            onDone(serverNotFoundError(topVer));
 
             return null;
         }
 
-        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal();
-
-        if (allowLocRead) {
-            GridDhtCacheAdapter colocated = cctx.dht();
-
-            while (true) {
-                GridCacheEntryEx entry;
-
-                try {
-                    entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                        colocated.peekEx(key);
-
-                    // If our DHT cache do has value, then we peek it.
-                    if (entry != null) {
-                        boolean isNew = entry.isNewLocked();
-
-                        CacheObject v = null;
-                        GridCacheVersion ver = null;
-
-                        if (needVer) {
-                            T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
-                                null,
-                                /*swap*/true,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                subjId,
-                                null,
-                                taskName,
-                                expiryPlc,
-                                true);
-
-                            if (res != null) {
-                                v = res.get1();
-                                ver = res.get2();
-                            }
-                        }
-                        else {
-                            v = entry.innerGet(null,
-                                /*swap*/true,
-                                /*read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                /*temporary*/false,
-                                subjId,
-                                null,
-                                taskName,
-                                expiryPlc,
-                                true);
-                        }
+        boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+            cctx.allowFastLocalRead(part, affNodes, topVer);
 
-                        colocated.context().evicts().touch(entry, topVer);
+        if (fastLocGet && localGet(topVer, part))
+            return null;
 
-                        // Entry was not in memory or in swap, so we remove it from cache.
-                        if (v == null) {
-                            if (isNew && entry.markObsoleteIfEmpty(ver))
-                                colocated.removeIfObsolete(key);
-                        }
-                        else {
-                            if (!skipVals && cctx.config().isStatisticsEnabled())
-                                cctx.cache().metrics0().onRead(true);
+        ClusterNode affNode = affinityNode(affNodes);
+
+        if (affNode == null) {
+            onDone(serverNotFoundError(topVer));
+
+            return null;
+        }
+
+        return affNode;
+    }
+
+    /**
+     * @param part Partition.
+     * @return {@code True} if partition is in owned state.
+     */
+    private boolean partitionOwned(int part) {
+        return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Exception.
+     */
+    private ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+        return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+            "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param part Partition.
+     * @return {@code True} if future completed.
+     */
+    private boolean localGet(AffinityTopologyVersion topVer, int part) {
+        assert cctx.affinityNode() : this;
+
+        GridDhtCacheAdapter colocated = cctx.dht();
 
-                            if (!skipVals)
-                                setResult(v, ver);
-                            else
-                                setSkipValueResult(true, ver);
+        while (true) {
+            GridCacheEntryEx entry;
 
-                            return null;
+            try {
+                entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+                    colocated.peekEx(key);
+
+                // If our DHT cache do has value, then we peek it.
+                if (entry != null) {
+                    boolean isNew = entry.isNewLocked();
+
+                    CacheObject v = null;
+                    GridCacheVersion ver = null;
+
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            true);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
                         }
+                    } else {
+                        v = entry.innerGet(null,
+                            /*swap*/true,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            true);
                     }
 
-                    break;
-                }
-                catch (GridDhtInvalidPartitionException ignored) {
-                    break;
-                }
-                catch (IgniteCheckedException e) {
-                    onDone(e);
+                    colocated.context().evicts().touch(entry, topVer);
 
-                    return null;
+                    // Entry was not in memory or in swap, so we remove it from cache.
+                    if (v == null) {
+                        if (isNew && entry.markObsoleteIfEmpty(ver))
+                            colocated.removeIfObsolete(key);
+                    } else {
+                        if (!skipVals && cctx.config().isStatisticsEnabled())
+                            cctx.cache().metrics0().onRead(true);
+
+                        if (!skipVals)
+                            setResult(v, ver);
+                        else
+                            setSkipValueResult(true, ver);
+
+                        return true;
+                    }
                 }
-                catch (GridCacheEntryRemovedException ignored) {
-                    // No-op, will retry.
+
+                boolean topStable = topVer.equals(cctx.topology().topologyVersion());
+
+                // Entry not found, complete future with null result if topology did not change and there is no store.
+                if (!cctx.store().configured() && (topStable || partitionOwned(part))) {
+                    setResult(null, null);
+
+                    return true;
                 }
+
+                return false;
             }
-        }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op, will retry.
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                return false;
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
 
-        return primary;
+                return true;
+            }
+        }
     }
 
     /**
@@ -614,14 +660,11 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
     /**
      * Affinity node to send get request to.
      *
-     * @param key Key to get.
-     * @param topVer Topology version.
+     * @param affNodes All affinity nodes.
      * @return Affinity node to get key from.
      */
-    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+    @Nullable private ClusterNode affinityNode(List<ClusterNode> affNodes) {
         if (!canRemap) {
-            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
             for (ClusterNode node : affNodes) {
                 if (cctx.discovery().alive(node))
                     return node;
@@ -630,7 +673,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
             return null;
         }
         else
-            return cctx.affinity().primary(key, topVer);
+            return affNodes.get(0);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index c547a88..eb39112 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -405,10 +406,20 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
         Map<KeyCacheObject, GridNearCacheEntry> saved
     ) {
+        int part = cctx.affinity().partition(key);
+
+        List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
+
+        if (affNodes.isEmpty()) {
+            onDone(serverNotFoundError(topVer));
+
+            return null;
+        }
+
         final GridNearCacheAdapter near = cache();
 
         // Allow to get cached value from the local node.
-        boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
+        boolean allowLocRead = !forcePrimary || cctx.localNode().equals(affNodes.get(0));
 
         while (true) {
             GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null;
@@ -456,119 +467,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     }
                 }
 
-                ClusterNode affNode = null;
-
-                if (v == null && allowLocRead && cctx.affinityNode()) {
-                    GridDhtCacheAdapter<K, V> dht = cache().dht();
-
-                    GridCacheEntryEx dhtEntry = null;
-
-                    try {
-                        dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
-
-                        // If near cache does not have value, then we peek DHT cache.
-                        if (dhtEntry != null) {
-                            boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
-
-                            if (needVer) {
-                                T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!isNear && !skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-
-                                if (res != null) {
-                                    v = res.get1();
-                                    ver = res.get2();
-                                }
-                            }
-                            else {
-                                v = dhtEntry.innerGet(tx,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
-                                    /*update-metrics*/false,
-                                    /*events*/!isNear && !skipVals,
-                                    /*temporary*/false,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-                            }
+                if (v == null) {
+                    boolean fastLocGet = allowLocRead && cctx.allowFastLocalRead(part, affNodes, topVer);
 
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
-                                dht.removeIfObsolete(key);
-                        }
-
-                        if (v != null) {
-                            if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
-                                near.metrics0().onRead(true);
-                        }
-                        else {
-                            affNode = affinityNode(key, topVer);
-
-                            if (affNode == null) {
-                                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                                    "(all partition nodes left the grid)."));
+                    if (fastLocGet && localDhtGet(key, part, topVer, affNodes, isNear))
+                        break;
 
-                                return saved;
-                            }
-
-                            if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
-                                near.metrics0().onRead(false);
-                        }
-                    }
-                    catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
-                        // No-op.
-                    }
-                    finally {
-                        if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) {
-                            dht.context().evicts().touch(dhtEntry, topVer);
-
-                            entry = null;
-                        }
-                    }
-                }
-
-                if (v != null) {
-                    if (needVer) {
-                        V val0 = (V)new T2<>(skipVals ? true : v, ver);
-
-                        add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
-                    }
-                    else {
-                        if (keepCacheObjects) {
-                            K key0 = (K)key;
-                            V val0 = (V)(skipVals ? true : v);
-
-                            add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
-                        }
-                        else {
-                            K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
-                            V val0 = !skipVals ?
-                                (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
-                                (V)Boolean.TRUE;
+                    ClusterNode affNode = affinityNode(affNodes);
 
-                            add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
-                        }
-                    }
-                }
-                else {
                     if (affNode == null) {
-                        affNode = affinityNode(key, topVer);
-
                         if (affNode == null) {
-                            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                                "(all partition nodes left the grid)."));
+                            onDone(serverNotFoundError(topVer));
 
                             return saved;
                         }
@@ -586,7 +495,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         }
                     }
 
-                    if (!cctx.affinity().localNode(key, topVer)) {
+                    if (!affNodes.contains(cctx.localNode())) {
                         GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer);
 
                         nearEntry.reserveEviction();
@@ -612,6 +521,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
 
                     old.put(key, addRdr);
                 }
+                else
+                    addResult(key, v, ver);
 
                 break;
             }
@@ -633,6 +544,138 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
     }
 
     /**
+     * @param key Key.
+     * @param part Partition.
+     * @param topVer Topology version.
+     * @param affNodes All affinity nodes.
+     * @param nearRead {@code True} if tried to read from near cache.
+     * @return {@code True} if there is no need to further search value.
+     */
+    private boolean localDhtGet(KeyCacheObject key,
+        int part,
+        AffinityTopologyVersion topVer,
+        List<ClusterNode> affNodes,
+        boolean nearRead) {
+        GridDhtCacheAdapter<K, V> dht = cache().dht();
+
+        while (true) {
+            GridCacheEntryEx dhtEntry = null;
+
+            try {
+                dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
+
+                CacheObject v = null;
+
+                // If near cache does not have value, then we peek DHT cache.
+                if (dhtEntry != null) {
+                    boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
+
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!nearRead && !skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
+                        }
+                    }
+                    else {
+                        v = dhtEntry.innerGet(tx,
+                            /*swap*/true,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /*update-metrics*/false,
+                            /*events*/!nearRead && !skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+                    }
+
+                    // Entry was not in memory or in swap, so we remove it from cache.
+                    if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
+                        dht.removeIfObsolete(key);
+                }
+
+                if (v != null) {
+                    if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
+                        cache().metrics0().onRead(true);
+
+                    addResult(key, v, ver);
+
+                    return true;
+                }
+                else {
+                    if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals && !affNodes.get(0).isLocal())
+                        cache().metrics0().onRead(false);
+
+                    boolean topStable = topVer.equals(cctx.topology().topologyVersion());
+
+                    // Entry not found, do not continue search if topology did not change and there is no store.
+                    return !cctx.store().configured() && (topStable || partitionOwned(part));
+                }
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // Retry.
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                return false;
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
+
+                return false;
+            }
+            finally {
+                if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
+                    dht.context().evicts().touch(dhtEntry, topVer);
+            }
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param v Value.
+     * @param ver Version.
+     */
+    @SuppressWarnings("unchecked")
+    private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) {
+        if (needVer) {
+            V val0 = (V)new T2<>(skipVals ? true : v, ver);
+
+            add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+        }
+        else {
+            if (keepCacheObjects) {
+                K key0 = (K)key;
+                V val0 = (V)(skipVals ? true : v);
+
+                add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+            }
+            else {
+                K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
+                V val0 = !skipVals ?
+                    (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
+                    (V)Boolean.TRUE;
+
+                add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+            }
+        }
+    }
+
+    /**
      * @return Near cache.
      */
     private GridNearCacheAdapter<K, V> cache() {