You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/05 13:25:45 UTC
[09/14] ignite git commit:
https://issues.apache.org/jira/browse/IGNITE-2329 - single get
https://issues.apache.org/jira/browse/IGNITE-2329 - single get
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b987466
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b987466
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b987466
Branch: refs/heads/ignite-2329-1
Commit: 7b987466af3fa0789e4cde8eb3f9a44126be129c
Parents: 3e97d82
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Feb 2 18:50:11 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Feb 2 18:50:11 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 18 +-
.../processors/cache/GridCachePreloader.java | 6 +
.../cache/GridCachePreloaderAdapter.java | 5 +
.../distributed/dht/GridDhtCacheAdapter.java | 69 ++-
.../distributed/dht/GridDhtEmbeddedFuture.java | 13 +-
.../distributed/dht/GridDhtGetSingleFuture.java | 479 +++++++++++++++++++
.../dht/preloader/GridDhtPreloader.java | 12 +
7 files changed, 564 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 986e529..ef7d30a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1852,7 +1852,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
tx.topologyVersion();
- final Map<K1, V1> map = U.newHashMap(keys.size());
+ final Map<K1, V1> map = keys.size() == 1 ?
+ (Map<K1, V1>)new IgniteBiTuple<>() :
+ U.<K1, V1>newHashMap(keys.size());
final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();
@@ -2046,17 +2048,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
);
}
- else {
- // If misses is not empty and store is disabled, we should touch missed entries.
- if (misses != null) {
- for (KeyCacheObject key : misses.keySet()) {
- GridCacheEntryEx entry = peekEx(key);
-
- if (entry != null)
- ctx.evicts().touch(entry, topVer);
- }
- }
- }
+ else
+ // Misses can be non-zero only if store is enabled.
+ assert misses == null;
return new GridFinishedFuture<>(map);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index c8fcb90..be019fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -137,6 +137,12 @@ public interface GridCachePreloader {
public IgniteInternalFuture<Boolean> rebalanceFuture();
/**
+ * @return {@code true} if there is no need to force keys preloading
+ * (e.g. rebalancing has been completed).
+ */
+ public boolean needForceKeys();
+
+ /**
* Requests that preloader sends the request for the key.
*
* @param keys Keys to request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index a1704fc..5d98c6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -93,6 +93,11 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
+ @Override public boolean needForceKeys() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public void onReconnected() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 749ee4d..8e456e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -719,21 +719,63 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/**
* @param nodeId Node ID.
+ * @param msgId Message ID.
+ * @param key Key.
+ * @param addRdr Add reader flag.
+ * @param readThrough Read through flag.
+ * @param topVer Topology version flag.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash.
+ * @param expiry Expiry.
+ * @param skipVals Skip vals flag.
+ * @return Future for the operation.
+ */
+ private IgniteInternalFuture<GridCacheEntryInfo> getDhtSingleAsync(
+ UUID nodeId,
+ long msgId,
+ KeyCacheObject key,
+ boolean addRdr,
+ boolean readThrough,
+ AffinityTopologyVersion topVer,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean skipVals
+ ) {
+ GridDhtGetSingleFuture<K, V> fut = new GridDhtGetSingleFuture<>(
+ ctx,
+ msgId,
+ nodeId,
+ key,
+ addRdr,
+ readThrough,
+ /*tx*/null,
+ topVer,
+ subjId,
+ taskNameHash,
+ expiry,
+ skipVals);
+
+ fut.init();
+
+ return fut;
+ }
+
+ /**
+ * @param nodeId Node ID.
* @param req Get request.
*/
protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) {
assert ctx.affinityNode();
- long ttl = req.accessTtl();
-
- final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);
-
- Map<KeyCacheObject, Boolean> map = Collections.singletonMap(req.key(), req.addReader());
+ final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(req.accessTtl());
- IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut =
- getDhtAsync(nodeId,
+ IgniteInternalFuture<GridCacheEntryInfo> fut =
+ getDhtSingleAsync(
+ nodeId,
req.messageId(),
- map,
+ req.key(),
+ req.addReader(),
req.readThrough(),
req.topologyVersion(),
req.subjectId(),
@@ -741,19 +783,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
expiryPlc,
req.skipValues());
- fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
- @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) {
+ fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
+ @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) {
GridNearSingleGetResponse res;
- GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
- (GridDhtFuture<Collection<GridCacheEntryInfo>>)f;
+ GridDhtFuture<GridCacheEntryInfo> fut = (GridDhtFuture<GridCacheEntryInfo>)f;
try {
- Collection<GridCacheEntryInfo> entries = fut.get();
+ GridCacheEntryInfo info = fut.get();
if (F.isEmpty(fut.invalidPartitions())) {
- GridCacheEntryInfo info = F.first(entries);
-
Message res0 = null;
if (info != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
index 0d10a93..1b9f743 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.Collections;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiClosure;
@@ -32,10 +31,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem
/** */
private static final long serialVersionUID = 0L;
- /** Retries. */
- @GridToStringInclude
- private Collection<Integer> invalidParts;
-
/**
* @param c Closure.
* @param embedded Embedded.
@@ -45,8 +40,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem
IgniteInternalFuture<B> embedded
) {
super(c, embedded);
-
- invalidParts = Collections.emptyList();
}
/**
@@ -58,17 +51,15 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem
IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c
) {
super(embedded, c);
-
- invalidParts = Collections.emptyList();
}
/** {@inheritDoc} */
@Override public Collection<Integer> invalidPartitions() {
- return invalidParts;
+ return Collections.emptyList();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtEmbeddedFuture.class, this, super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
new file mode 100644
index 0000000..4439307
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCacheEntryInfo>
+ implements GridDhtFuture<GridCacheEntryInfo> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ private static IgniteLogger log;
+
+ /** Message ID. */
+ private long msgId;
+
+ /** */
+ private UUID reader;
+
+ /** Read through flag. */
+ private boolean readThrough;
+
+ /** Context. */
+ private GridCacheContext<K, V> cctx;
+
+ /** Key. */
+ private KeyCacheObject key;
+
+ /** */
+ private boolean addRdr;
+
+ /** Reserved partitions. */
+ private int part = -1;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Version. */
+ private GridCacheVersion ver;
+
+ /** Topology version .*/
+ private AffinityTopologyVersion topVer;
+
+ /** Transaction. */
+ private IgniteTxLocalEx tx;
+
+ /** Retries because ownership changed. */
+ private Collection<Integer> retries;
+
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name. */
+ private int taskNameHash;
+
+ /** Expiry policy. */
+ private IgniteCacheExpiryPolicy expiryPlc;
+
+ /** Skip values flag. */
+ private boolean skipVals;
+
+ /**
+ * @param cctx Context.
+ * @param msgId Message ID.
+ * @param reader Reader.
+ * @param key Key.
+ * @param addRdr Add reader flag.
+ * @param readThrough Read through flag.
+ * @param tx Transaction.
+ * @param topVer Topology version.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
+ */
+ public GridDhtGetSingleFuture(
+ GridCacheContext<K, V> cctx,
+ long msgId,
+ UUID reader,
+ KeyCacheObject key,
+ Boolean addRdr,
+ boolean readThrough,
+ @Nullable IgniteTxLocalEx tx,
+ @NotNull AffinityTopologyVersion topVer,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean skipVals
+ ) {
+ assert reader != null;
+ assert key != null;
+
+ this.reader = reader;
+ this.cctx = cctx;
+ this.msgId = msgId;
+ this.key = key;
+ this.addRdr = addRdr;
+ this.readThrough = readThrough;
+ this.tx = tx;
+ this.topVer = topVer;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.expiryPlc = expiryPlc;
+ this.skipVals = skipVals;
+
+ futId = IgniteUuid.randomUuid();
+
+ ver = tx == null ? cctx.versions().next() : tx.xidVersion();
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridDhtGetSingleFuture.class);
+ }
+
+ /**
+ * Initializes future.
+ */
+ void init() {
+ map();
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Future version.
+ */
+ public GridCacheVersion version() {
+ return ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(GridCacheEntryInfo res, Throwable err) {
+ if (super.onDone(res, err)) {
+ // Release all partitions reserved by this future.
+ if (part != -1)
+ cctx.topology().releasePartitions(part);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ *
+ */
+ private void map() {
+ if (cctx.dht().dhtPreloader().needForceKeys()) {
+ GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(
+ Collections.singleton(key),
+ topVer);
+
+ if (fut != null) {
+ if (F.isEmpty(fut.invalidPartitions())) {
+ if (retries == null)
+ retries = new HashSet<>();
+
+ retries.addAll(fut.invalidPartitions());
+ }
+
+ fut.listen(
+ new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> fut) {
+ Throwable e = fut.error();
+
+ if (e != null) { // Check error first.
+ if (log.isDebugEnabled())
+ log.debug("Failed to request keys from preloader " +
+ "[keys=" + key + ", err=" + e + ']');
+
+ onDone(e);
+ }
+ else
+ map0();
+ }
+ }
+ );
+
+ return;
+ }
+ }
+
+ map0();
+ }
+
+ /**
+ *
+ */
+ private void map0() {
+ // Assign keys to primary nodes.
+ int part = cctx.affinity().partition(key);
+
+ if (retries == null || !retries.contains(part)) {
+ if (!map(key)) {
+ retries = Collections.singleton(part);
+
+ onDone((GridCacheEntryInfo)null);
+
+ return;
+ }
+ }
+
+ getAsync();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Integer> invalidPartitions() {
+ return retries == null ? Collections.<Integer>emptyList() : retries;
+ }
+
+ /**
+ * @param key Key.
+ * @return {@code True} if mapped.
+ */
+ private boolean map(KeyCacheObject key) {
+ GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
+ cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
+ cache().topology().localPartition(key, false);
+
+ if (part == null)
+ return false;
+
+ assert this.part == -1;
+
+ // By reserving, we make sure that partition won't be unloaded while processed.
+ if (part.reserve()) {
+ this.part = part.id();
+
+ return true;
+ }
+ else
+ return false;
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings( {"unchecked", "IfMayBeConditional"})
+ private void getAsync() {
+ assert part != -1;
+
+ String taskName0 = cctx.kernalContext().job().currentTaskName();
+
+ if (taskName0 == null)
+ taskName0 = cctx.kernalContext().task().resolveTaskName(taskNameHash);
+
+ final String taskName = taskName0;
+
+ IgniteInternalFuture<Boolean> rdrFut = null;
+
+ ClusterNode readerNode = cctx.discovery().node(reader);
+
+ if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
+ while (true) {
+ GridDhtCacheEntry e = cache().entryExx(key, topVer);
+
+ try {
+ if (e.obsolete())
+ continue;
+
+ boolean addReader = (!e.deleted() && addRdr && !skipVals);
+
+ if (addReader)
+ e.unswap(false);
+
+ // Register reader. If there are active transactions for this entry,
+ // then will wait for their completion before proceeding.
+ // TODO: GG-4003:
+ // TODO: What if any transaction we wait for actually removes this entry?
+ // TODO: In this case seems like we will be stuck with untracked near entry.
+ // TODO: To fix, check that reader is contained in the list of readers once
+ // TODO: again after the returned future completes - if not, try again.
+ rdrFut = addReader ? e.addReader(reader, msgId, topVer) : null;
+
+ break;
+ }
+ catch (IgniteCheckedException err) {
+ onDone(err);
+
+ return;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry when getting a DHT value: " + e);
+ }
+ finally {
+ cctx.evicts().touch(e, topVer);
+ }
+ }
+ }
+
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
+
+ if (rdrFut == null || rdrFut.isDone()) {
+ if (tx == null) {
+ fut = cache().getDhtAllAsync(
+ Collections.singleton(key),
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
+ /*can remap*/true);
+ }
+ else {
+ fut = tx.getAllAsync(cctx,
+ Collections.singleton(key),
+ /*deserialize binary*/false,
+ skipVals,
+ /*keep cache objects*/true,
+ /*skip store*/!readThrough,
+ false);
+ }
+ }
+ else {
+ rdrFut.listen(
+ new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ Throwable e = fut.error();
+
+ if (e != null) {
+ onDone(e);
+
+ return;
+ }
+
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut0;
+
+ if (tx == null) {
+ fut0 = cache().getDhtAllAsync(
+ Collections.singleton(key),
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
+ /*can remap*/true);
+ }
+ else {
+ fut0 = tx.getAllAsync(cctx,
+ Collections.singleton(key),
+ /*deserialize binary*/false,
+ skipVals,
+ /*keep cache objects*/true,
+ /*skip store*/!readThrough,
+ false
+ );
+ }
+
+ fut0.listen(createGetFutureListener());
+ }
+ }
+ );
+
+ return;
+ }
+
+ if (fut.isDone())
+ onResult(fut);
+ else
+ fut.listen(createGetFutureListener());
+ }
+
+ /**
+ * @return Listener for get future.
+ */
+ @NotNull private IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>
+ createGetFutureListener() {
+ return new IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() {
+ @Override public void apply(
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut
+ ) {
+ onResult(fut);
+ }
+ };
+ }
+
+ /**
+ * @param fut Completed future to finish this process with.
+ */
+ private void onResult(IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut) {
+ assert fut.isDone();
+
+ if (fut.error() != null)
+ onDone(fut.error());
+ else {
+ try {
+ onDone(toEntryInfo(fut.get()));
+ }
+ catch (IgniteCheckedException e) {
+ assert false; // Should never happen.
+ }
+ }
+ }
+
+ /**
+ * @param map Map to convert.
+ * @return List of infos.
+ */
+ private GridCacheEntryInfo toEntryInfo(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map) {
+ Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> e = F.firstEntry(map);
+
+ if (e != null) {
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ T2<CacheObject, GridCacheVersion> val = e.getValue();
+
+ assert val != null;
+
+ info.cacheId(cctx.cacheId());
+ info.key(e.getKey());
+ info.value(skipVals ? null : val.get1());
+ info.version(val.get2());
+
+ return info;
+ }
+
+ return null;
+ }
+
+ /**
+ * @return DHT cache.
+ */
+ private GridDhtCacheAdapter<K, V> cache() {
+ return (GridDhtCacheAdapter<K, V>)cctx.cache();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0bff618..a92a080 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -693,6 +693,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
}
+ /** {@inheritDoc} */
+ @Override public boolean needForceKeys() {
+ if (cctx.rebalanceEnabled()) {
+ IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
+
+ if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result()))
+ return false;
+ }
+
+ return true;
+ }
+
/**
* @param keys Keys to request.
* @return Future for request.