You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/12 15:45:04 UTC
ignite git commit: ignite-1811 Optimized cache 'get' on affinity node.
Repository: ignite
Updated Branches:
refs/heads/ignite-1811 [created] 8f1f6626a
ignite-1811 Optimized cache 'get' on affinity node.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8f1f6626
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8f1f6626
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8f1f6626
Branch: refs/heads/ignite-1811
Commit: 8f1f6626a9dd413e55f1055d32c5335f1069eb82
Parents: 10012b4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jan 12 17:44:54 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jan 12 17:44:54 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheContext.java | 30 +++
.../dht/CacheDistributedGetFutureAdapter.java | 28 +-
.../dht/GridDhtPartitionTopologyImpl.java | 27 +-
.../dht/GridPartitionedGetFuture.java | 234 +++++++++--------
.../dht/GridPartitionedSingleGetFuture.java | 219 ++++++++-------
.../distributed/near/GridNearGetFuture.java | 263 +++++++++++--------
6 files changed, 474 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index c10ebf3..1b9e081 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -111,6 +111,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
/**
* Cache context.
@@ -1961,6 +1962,35 @@ public class GridCacheContext<K, V> implements Externalizable {
});
}
+ /**
+ * @param part Partition.
+ * @param affNodes Affinity nodes.
+ * @param topVer Topology version.
+ * @return {@code True} if cache 'get' operation is allowed to get entry locally.
+ */
+ public boolean allowFastLocalRead(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+ return affinityNode() && hasPartition(part, affNodes, topVer);
+ }
+
+ /**
+ * @param part Partition.
+ * @param affNodes Affinity nodes.
+ * @param topVer Topology version.
+ * @return {@code True} if partition is available locally.
+ */
+ private boolean hasPartition(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+ return topology().rebalanceFinished(topVer) && (isReplicated() || affNodes.contains(locNode))
+ || partitionOwned(part);
+ }
+
+ /**
+ * @param part Partition.
+ * @return {@code True} if partition is in owned state.
+ */
+ private boolean partitionOwned(int part) {
+ return topology().partitionState(localNodeId(), part) == OWNING;
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, gridName());
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index c43cce9..40eec63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
@@ -39,6 +40,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
/**
*
@@ -168,14 +170,11 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
/**
* Affinity node to send get request to.
*
- * @param key Key to get.
- * @param topVer Topology version.
+ * @param affNodes All affinity nodes.
* @return Affinity node to get key from.
*/
- protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ protected final ClusterNode affinityNode(List<ClusterNode> affNodes) {
if (!canRemap) {
- List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
for (ClusterNode node : affNodes) {
if (cctx.discovery().alive(node))
return node;
@@ -184,6 +183,23 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
return null;
}
else
- return cctx.affinity().primary(key, topVer);
+ return affNodes.get(0);
+ }
+
+ /**
+ * @param part Partition.
+ * @return {@code True} if partition is in owned state.
+ */
+ protected final boolean partitionOwned(int part) {
+ return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return Exception.
+ */
+ protected final ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+ return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a0709c5..2ab8a12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -88,7 +88,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private GridDhtPartitionExchangeId lastExchangeId;
/** */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+ private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
/** */
private volatile boolean stopping;
@@ -136,9 +136,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
topReadyFut = null;
- topVer = AffinityTopologyVersion.NONE;
-
rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+ topVer = AffinityTopologyVersion.NONE;
}
finally {
lock.writeLock().unlock();
@@ -223,13 +223,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
this.stopping = stopping;
- topVer = exchId.topologyVersion();
-
updateSeq.setIfGreater(updSeq);
topReadyFut = exchFut;
rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+ topVer = exchId.topologyVersion();
}
finally {
lock.writeLock().unlock();
@@ -238,17 +238,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
- lock.readLock().lock();
+ AffinityTopologyVersion topVer = this.topVer;
- try {
- assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
- ", cacheName=" + cctx.name() + ']';
+ assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
+ ", cacheName=" + cctx.name() + ']';
- return topVer;
- }
- finally {
- lock.readLock().unlock();
- }
+ return topVer;
}
/** {@inheritDoc} */
@@ -1336,7 +1331,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
- return topVer.equals(rebalancedTopVer);
+ AffinityTopologyVersion curTopVer = this.topVer;
+
+ return curTopVer.equals(topVer) && curTopVer.equals(rebalancedTopVer);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 19df1c2..9ed9dc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -234,15 +235,16 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
AffinityTopologyVersion topVer
) {
- if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+ Collection<ClusterNode> cacheNodes = CU.affinityNodes(cctx, topVer);
+
+ if (cacheNodes.isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
return;
}
- Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings =
- U.newHashMap(CU.affinityNodes(cctx, topVer).size());
+ Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(cacheNodes.size());
final int keysSize = keys.size();
@@ -374,135 +376,151 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
AffinityTopologyVersion topVer,
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped
) {
- GridDhtCacheAdapter<K, V> colocated = cache();
+ int part = cctx.affinity().partition(key);
- boolean remote = false;
+ List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
- // Allow to get cached value from the local node.
- boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) ||
- cctx.affinity().primary(cctx.localNode(), key, topVer);
+ if (affNodes.isEmpty()) {
+ onDone(serverNotFoundError(topVer));
- while (true) {
- GridCacheEntryEx entry;
+ return false;
+ }
- try {
- if (allowLocRead) {
- try {
- entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
- colocated.peekEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
-
- if (res != null) {
- v = res.get1();
- ver = res.get2();
- }
- }
- else {
- v = entry.innerGet(null,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
+ boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+ cctx.allowFastLocalRead(part, affNodes, topVer);
- colocated.context().evicts().touch(entry, topVer);
+ if (fastLocGet && localGet(key, part, locVals))
+ return false;
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- colocated.removeIfObsolete(key);
- }
- else {
- if (needVer)
- versionedResult(locVals, key, v, ver);
- else
- cctx.addResult(locVals,
- key,
- v,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- true);
-
- return false;
- }
- }
- }
- catch (GridDhtInvalidPartitionException ignored) {
- // No-op.
- }
- }
+ ClusterNode node = affinityNode(affNodes);
- ClusterNode node = affinityNode(key, topVer);
+ if (node == null) {
+ onDone(serverNotFoundError(topVer));
- if (node == null) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid)."));
+ return false;
+ }
- return false;
- }
+ boolean remote = !node.isLocal();
+
+ LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+
+ if (keys != null && keys.containsKey(key)) {
+ if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
+ onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+ MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+ U.toShortString(node) + ", mappings=" + mapped + ']'));
+
+ return false;
+ }
+ }
+
+ LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
- remote = !node.isLocal();
+ if (old == null)
+ mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+
+ old.put(key, false);
+
+ return remote;
+ }
- LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+ /**
+ * @param key Key.
+ * @param part Partition.
+ * @param locVals Local values.
+ * @return {@code True} if there is no need to further search value.
+ */
+ private boolean localGet(KeyCacheObject key, int part, Map<K, V> locVals) {
+ GridDhtCacheAdapter<K, V> cache = cache();
- if (keys != null && keys.containsKey(key)) {
- if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
- onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
- MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
- U.toShortString(node) + ", mappings=" + mapped + ']'));
+ while (true) {
+ GridCacheEntryEx entry;
- return false;
+ try {
+ entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
}
- }
+ else {
+ v = entry.innerGet(null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
+
+ cache.context().evicts().touch(entry, topVer);
- LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ cache.removeIfObsolete(key);
+ }
+ else {
+ if (needVer)
+ versionedResult(locVals, key, v, ver);
+ else {
+ cctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true);
+ }
- if (old == null)
- mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+ return true;
+ }
+ }
- old.put(key, false);
+ boolean topStable = topVer.equals(cctx.topology().topologyVersion());
- break;
+ // Entry not found, do not continue search if topology did not change and there is no store.
+ return !cctx.store().configured() && (topStable || partitionOwned(part));
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, will retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ return false;
}
catch (IgniteCheckedException e) {
onDone(e);
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, will retry.
+ return true;
}
}
-
- return remote;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 29971fd..ffd7f23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -58,6 +58,8 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
/**
*
*/
@@ -319,105 +321,149 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
* @return Primary node or {@code null} if future was completed.
*/
@Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) {
- ClusterNode primary = affinityNode(key, topVer);
+ int part = cctx.affinity().partition(key);
+
+ List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
- if (primary == null) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+ if (affNodes.isEmpty()) {
+ onDone(serverNotFoundError(topVer));
return null;
}
- boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal();
-
- if (allowLocRead) {
- GridDhtCacheAdapter colocated = cctx.dht();
-
- while (true) {
- GridCacheEntryEx entry;
-
- try {
- entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
- colocated.peekEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- true);
-
- if (res != null) {
- v = res.get1();
- ver = res.get2();
- }
- }
- else {
- v = entry.innerGet(null,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc,
- true);
- }
+ boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+ cctx.allowFastLocalRead(part, affNodes, topVer);
- colocated.context().evicts().touch(entry, topVer);
+ if (fastLocGet && localGet(topVer, part))
+ return null;
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- colocated.removeIfObsolete(key);
- }
- else {
- if (!skipVals && cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(true);
+ ClusterNode affNode = affinityNode(affNodes);
+
+ if (affNode == null) {
+ onDone(serverNotFoundError(topVer));
+
+ return null;
+ }
+
+ return affNode;
+ }
+
+ /**
+ * @param part Partition.
+ * @return {@code True} if partition is in owned state.
+ */
+ private boolean partitionOwned(int part) {
+ return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return Exception.
+ */
+ private ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+ return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param part Partition.
+ * @return {@code True} if future completed.
+ */
+ private boolean localGet(AffinityTopologyVersion topVer, int part) {
+ assert cctx.affinityNode() : this;
+
+ GridDhtCacheAdapter colocated = cctx.dht();
- if (!skipVals)
- setResult(v, ver);
- else
- setSkipValueResult(true, ver);
+ while (true) {
+ GridCacheEntryEx entry;
- return null;
+ try {
+ entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+ colocated.peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
}
+ } else {
+ v = entry.innerGet(null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true);
}
- break;
- }
- catch (GridDhtInvalidPartitionException ignored) {
- break;
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ colocated.context().evicts().touch(entry, topVer);
- return null;
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ colocated.removeIfObsolete(key);
+ } else {
+ if (!skipVals && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(true);
+
+ if (!skipVals)
+ setResult(v, ver);
+ else
+ setSkipValueResult(true, ver);
+
+ return true;
+ }
}
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, will retry.
+
+ boolean topStable = topVer.equals(cctx.topology().topologyVersion());
+
+ // Entry not found, complete future with null result if topology did not change and there is no store.
+ if (!cctx.store().configured() && (topStable || partitionOwned(part))) {
+ setResult(null, null);
+
+ return true;
}
+
+ return false;
}
- }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, will retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ return false;
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
- return primary;
+ return true;
+ }
+ }
}
/**
@@ -614,14 +660,11 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
/**
* Affinity node to send get request to.
*
- * @param key Key to get.
- * @param topVer Topology version.
+ * @param affNodes All affinity nodes.
* @return Affinity node to get key from.
*/
- private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ @Nullable private ClusterNode affinityNode(List<ClusterNode> affNodes) {
if (!canRemap) {
- List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
for (ClusterNode node : affNodes) {
if (cctx.discovery().alive(node))
return node;
@@ -630,7 +673,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
return null;
}
else
- return cctx.affinity().primary(key, topVer);
+ return affNodes.get(0);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f1f6626/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index c547a88..eb39112 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -405,10 +406,20 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
Map<KeyCacheObject, GridNearCacheEntry> saved
) {
+ int part = cctx.affinity().partition(key);
+
+ List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
+
+ if (affNodes.isEmpty()) {
+ onDone(serverNotFoundError(topVer));
+
+ return null;
+ }
+
final GridNearCacheAdapter near = cache();
// Allow to get cached value from the local node.
- boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
+ boolean allowLocRead = !forcePrimary || cctx.localNode().equals(affNodes.get(0));
while (true) {
GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null;
@@ -456,119 +467,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
}
- ClusterNode affNode = null;
-
- if (v == null && allowLocRead && cctx.affinityNode()) {
- GridDhtCacheAdapter<K, V> dht = cache().dht();
-
- GridCacheEntryEx dhtEntry = null;
-
- try {
- dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
-
- // If near cache does not have value, then we peek DHT cache.
- if (dhtEntry != null) {
- boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
-
- if (needVer) {
- T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!isNear && !skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
-
- if (res != null) {
- v = res.get1();
- ver = res.get2();
- }
- }
- else {
- v = dhtEntry.innerGet(tx,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /*update-metrics*/false,
- /*events*/!isNear && !skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
+ if (v == null) {
+ boolean fastLocGet = allowLocRead && cctx.allowFastLocalRead(part, affNodes, topVer);
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
- dht.removeIfObsolete(key);
- }
-
- if (v != null) {
- if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
- near.metrics0().onRead(true);
- }
- else {
- affNode = affinityNode(key, topVer);
-
- if (affNode == null) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid)."));
+ if (fastLocGet && localDhtGet(key, part, topVer, affNodes, isNear))
+ break;
- return saved;
- }
-
- if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
- near.metrics0().onRead(false);
- }
- }
- catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
- // No-op.
- }
- finally {
- if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) {
- dht.context().evicts().touch(dhtEntry, topVer);
-
- entry = null;
- }
- }
- }
-
- if (v != null) {
- if (needVer) {
- V val0 = (V)new T2<>(skipVals ? true : v, ver);
-
- add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
- }
- else {
- if (keepCacheObjects) {
- K key0 = (K)key;
- V val0 = (V)(skipVals ? true : v);
-
- add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
- }
- else {
- K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
- V val0 = !skipVals ?
- (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
- (V)Boolean.TRUE;
+ ClusterNode affNode = affinityNode(affNodes);
- add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
- }
- }
- }
- else {
if (affNode == null) {
- affNode = affinityNode(key, topVer);
-
if (affNode == null) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid)."));
+ onDone(serverNotFoundError(topVer));
return saved;
}
@@ -586,7 +495,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
}
- if (!cctx.affinity().localNode(key, topVer)) {
+ if (!affNodes.contains(cctx.localNode())) {
GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer);
nearEntry.reserveEviction();
@@ -612,6 +521,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
old.put(key, addRdr);
}
+ else
+ addResult(key, v, ver);
break;
}
@@ -633,6 +544,138 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
/**
+ * @param key Key.
+ * @param part Partition.
+ * @param topVer Topology version.
+ * @param affNodes All affinity nodes.
+ * @param nearRead {@code True} if tried to read from near cache.
+ * @return {@code True} if there is no need to further search value.
+ */
+ private boolean localDhtGet(KeyCacheObject key,
+ int part,
+ AffinityTopologyVersion topVer,
+ List<ClusterNode> affNodes,
+ boolean nearRead) {
+ GridDhtCacheAdapter<K, V> dht = cache().dht();
+
+ while (true) {
+ GridCacheEntryEx dhtEntry = null;
+
+ try {
+ dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
+
+ CacheObject v = null;
+
+ // If near cache does not have value, then we peek DHT cache.
+ if (dhtEntry != null) {
+ boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!nearRead && !skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = dhtEntry.innerGet(tx,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*events*/!nearRead && !skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
+ dht.removeIfObsolete(key);
+ }
+
+ if (v != null) {
+ if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
+ cache().metrics0().onRead(true);
+
+ addResult(key, v, ver);
+
+ return true;
+ }
+ else {
+ if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals && !affNodes.get(0).isLocal())
+ cache().metrics0().onRead(false);
+
+ boolean topStable = topVer.equals(cctx.topology().topologyVersion());
+
+ // Entry not found, do not continue search if topology did not change and there is no store.
+ return !cctx.store().configured() && (topStable || partitionOwned(part));
+ }
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // Retry.
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ return false;
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+
+ return false;
+ }
+ finally {
+ if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
+ dht.context().evicts().touch(dhtEntry, topVer);
+ }
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param v Value.
+ * @param ver Version.
+ */
+ @SuppressWarnings("unchecked")
+ private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) {
+ if (needVer) {
+ V val0 = (V)new T2<>(skipVals ? true : v, ver);
+
+ add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+ }
+ else {
+ if (keepCacheObjects) {
+ K key0 = (K)key;
+ V val0 = (V)(skipVals ? true : v);
+
+ add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+ }
+ else {
+ K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
+ V val0 = !skipVals ?
+ (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
+ (V)Boolean.TRUE;
+
+ add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+ }
+ }
+ }
+
+ /**
* @return Near cache.
*/
private GridNearCacheAdapter<K, V> cache() {