You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/20 12:05:57 UTC
[20/37] ignite git commit: Optimization for single key cache 'get'
operation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
new file mode 100644
index 0000000..8f2357b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CIX1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> implements GridCacheFuture<Object>,
+ CacheGetFuture {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final IgniteProductVersion SINGLE_GET_MSG_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ private static IgniteLogger log;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Context. */
+ private final GridCacheContext cctx;
+
+ /** Key. */
+ private final KeyCacheObject key;
+
+ /** Read through flag. */
+ private final boolean readThrough;
+
+ /** Force primary flag. */
+ private final boolean forcePrimary;
+
+ /** Future ID. */
+ private final IgniteUuid futId;
+
+ /** Trackable flag. */
+ private boolean trackable;
+
+ /** Subject ID. */
+ private final UUID subjId;
+
+ /** Task name. */
+ private final String taskName;
+
+ /** Whether to deserialize portable objects. */
+ private boolean deserializePortable;
+
+ /** Skip values flag. */
+ private boolean skipVals;
+
+ /** Expiry policy. */
+ private IgniteCacheExpiryPolicy expiryPlc;
+
+ /** Flag indicating that get should be done on a locked topology version. */
+ private final boolean canRemap;
+
+ /** */
+ private final boolean needVer;
+
+ /** */
+ private final boolean keepCacheObjects;
+
+ /** */
+ private ClusterNode node;
+
+ /**
+ * @param cctx Context.
+ * @param key Key.
+ * @param topVer Topology version.
+ * @param readThrough Read through flag.
+ * @param forcePrimary If {@code true} then will force network trip to primary node even if called on backup node.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable flag.
+ * @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
+ * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+ * @param needVer If {@code true} returns values as tuples containing value and version.
+ * @param keepCacheObjects Keep cache objects flag.
+ */
+ public GridPartitionedSingleGetFuture(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ AffinityTopologyVersion topVer,
+ boolean readThrough,
+ boolean forcePrimary,
+ @Nullable UUID subjId,
+ String taskName,
+ boolean deserializePortable,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean skipVals,
+ boolean canRemap,
+ boolean needVer,
+ boolean keepCacheObjects
+ ) {
+ assert key != null;
+
+ this.cctx = cctx;
+ this.key = key;
+ this.readThrough = readThrough;
+ this.forcePrimary = forcePrimary;
+ this.subjId = subjId;
+ this.taskName = taskName;
+ this.deserializePortable = deserializePortable;
+ this.expiryPlc = expiryPlc;
+ this.skipVals = skipVals;
+ this.canRemap = canRemap;
+ this.needVer = needVer;
+ this.keepCacheObjects = keepCacheObjects;
+ this.topVer = topVer;
+
+ futId = IgniteUuid.randomUuid();
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridPartitionedSingleGetFuture.class);
+ }
+
+ /**
+ *
+ */
+ public void init() {
+ AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
+ canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+
+ map(topVer);
+ }
+
+ /**
+ * @param topVer Topology version.
+ */
+ @SuppressWarnings("unchecked")
+ private void map(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+
+ ClusterNode node = mapKeyToNode(topVer);
+
+ if (node == null) {
+ assert isDone() : this;
+
+ return;
+ }
+
+ if (node.isLocal()) {
+ LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1);
+
+ map.put(key, false);
+
+ final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cctx.dht().getDhtAsync(node.id(),
+ -1,
+ map,
+ readThrough,
+ topVer,
+ subjId,
+ taskName == null ? 0 : taskName.hashCode(),
+ expiryPlc,
+ skipVals);
+
+ final Collection<Integer> invalidParts = fut.invalidPartitions();
+
+ if (!F.isEmpty(invalidParts)) {
+ AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
+
+ assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology " +
+ "version did not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
+ ", invalidParts=" + invalidParts + ']';
+
+ // Remap recursively.
+ map(updTopVer);
+ }
+ else {
+ fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
+ @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) {
+ try {
+ Collection<GridCacheEntryInfo> infos = fut.get();
+
+ assert F.isEmpty(infos) || infos.size() == 1 : infos;
+
+ setResult(F.first(infos));
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
+
+ onDone(e);
+ }
+ }
+ });
+ }
+ }
+ else {
+ synchronized (this) {
+ this.node = node;
+ }
+
+ if (!trackable) {
+ trackable = true;
+
+ cctx.mvcc().addFuture(this, futId);
+ }
+
+ GridCacheMessage req;
+
+ if (node.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0) {
+ req = new GridNearSingleGetRequest(cctx.cacheId(),
+ futId,
+ key,
+ readThrough,
+ topVer,
+ subjId,
+ taskName == null ? 0 : taskName.hashCode(),
+ expiryPlc != null ? expiryPlc.forAccess() : -1L,
+ skipVals,
+ /**add reader*/false,
+ needVer,
+ cctx.deploymentEnabled());
+ }
+ else {
+ LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1);
+
+ map.put(key, false);
+
+ req = new GridNearGetRequest(
+ cctx.cacheId(),
+ futId,
+ futId,
+ cctx.versions().next(),
+ map,
+ readThrough,
+ topVer,
+ subjId,
+ taskName == null ? 0 : taskName.hashCode(),
+ expiryPlc != null ? expiryPlc.forAccess() : -1L,
+ skipVals,
+ cctx.deploymentEnabled());
+ }
+
+ try {
+ cctx.io().send(node, req, cctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyCheckedException)
+ onNodeLeft(node.id());
+ else
+ onDone(e);
+ }
+ }
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return Primary node or {@code null} if future was completed.
+ */
+ @Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) {
+ ClusterNode primary = affinityNode(key, topVer);
+
+ if (primary == null) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+
+ return null;
+ }
+
+ boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal();
+
+ if (allowLocRead) {
+ GridDhtCacheAdapter colocated = cctx.dht();
+
+ while (true) {
+ GridCacheEntryEx entry;
+
+ try {
+ entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+ colocated.peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = entry.innerGet(null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+ }
+
+ colocated.context().evicts().touch(entry, topVer);
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ colocated.removeIfObsolete(key);
+ }
+ else {
+ if (!skipVals && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(true);
+
+ if (!skipVals)
+ setResult(v, ver);
+ else
+ setSkipValueResult(true, ver);
+
+ return null;
+ }
+ }
+
+ break;
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ break;
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+
+ return null;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, will retry.
+ }
+ }
+ }
+
+ return primary;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Result.
+ */
+ public void onResult(UUID nodeId, GridNearSingleGetResponse res) {
+ if (!processResponse(nodeId) || !checkError(res.error(), res.invalidPartitions(), res.topologyVersion()))
+ return;
+
+ Message res0 = res.result();
+
+ if (needVer) {
+ CacheVersionedValue verVal = (CacheVersionedValue)res0;
+
+ if (verVal != null) {
+ if (skipVals)
+ setSkipValueResult(true, verVal.version());
+ else
+ setResult(verVal.value() , verVal.version());
+ }
+ else {
+ if (skipVals)
+ setSkipValueResult(false, null);
+ else
+ setResult(null , null);
+ }
+ }
+ else {
+ if (skipVals)
+ setSkipValueResult(res.containsValue(), null);
+ else
+ setResult((CacheObject)res0, null);
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Result.
+ */
+ public void onResult(UUID nodeId, GridNearGetResponse res) {
+ if (!processResponse(nodeId) ||
+ !checkError(res.error(), !F.isEmpty(res.invalidPartitions()), res.topologyVersion()))
+ return;
+
+ Collection<GridCacheEntryInfo> infos = res.entries();
+
+ assert F.isEmpty(infos) || infos.size() == 1 : infos;
+
+ setResult(F.first(infos));
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return {@code True} if should process received response.
+ */
+ private boolean processResponse(UUID nodeId) {
+ synchronized (this) {
+ if (node != null && node.id().equals(nodeId)) {
+ node = null;
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param err Error.
+ * @param invalidParts Invalid partitions error flag.
+ * @param rmtTopVer Received topology version.
+ */
+ private boolean checkError(@Nullable IgniteCheckedException err,
+ boolean invalidParts,
+ AffinityTopologyVersion rmtTopVer) {
+ if (err != null) {
+ onDone(err);
+
+ return false;
+ }
+
+ if (invalidParts) {
+ assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO);
+
+ if (rmtTopVer.compareTo(topVer) <= 0) {
+ // Fail the whole get future.
+ onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
+ "invalid partitions but remote topology version does not differ from local) " +
+ "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", part=" + cctx.affinity().partition(key) +
+ ", nodeId=" + node.id() + ']'));
+
+ return false;
+ }
+
+ if (canRemap) {
+ IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
+
+ topFut.listen(new CIX1<IgniteInternalFuture<Long>>() {
+ @Override public void applyx(IgniteInternalFuture<Long> fut) {
+ try {
+ AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get());
+
+ remap(topVer);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+
+ }
+ else
+ map(topVer);
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @param info Entry info.
+ */
+ private void setResult(@Nullable GridCacheEntryInfo info) {
+ assert info == null || skipVals == (info.value() == null);
+
+ if (skipVals) {
+ if (info != null)
+ setSkipValueResult(true, info.version());
+ else
+ setSkipValueResult(false, null);
+ }
+ else {
+ if (info != null)
+ setResult(info.value(), info.version());
+ else
+ setResult(null, null);
+ }
+ }
+
+ /**
+ * @param res Result.
+ * @param ver Version.
+ */
+ private void setSkipValueResult(boolean res, @Nullable GridCacheVersion ver) {
+ assert skipVals;
+
+ if (needVer) {
+ assert ver != null || !res;
+
+ onDone(new T2<>(res, ver));
+ }
+ else
+ onDone(res);
+ }
+
+ /**
+ * @param val Value.
+ * @param ver Version.
+ */
+ private void setResult(@Nullable CacheObject val, @Nullable GridCacheVersion ver) {
+ try {
+ assert !skipVals;
+
+ if (val != null) {
+ if (needVer) {
+ assert ver != null;
+
+ onDone(new T2<>(val, ver));
+ }
+ else {
+ if (!keepCacheObjects) {
+ Object res = CU.value(val, cctx, true);
+
+ if (deserializePortable && !skipVals)
+ res = cctx.unwrapPortableIfNeeded(res, false);
+
+ onDone(res);
+ }
+ else
+ onDone(val);
+ }
+ }
+ else
+ onDone(null);
+ }
+ catch (Exception e) {
+ onDone(e);
+ }
+ }
+
+ /**
+ * Affinity node to send get request to.
+ *
+ * @param key Key to get.
+ * @param topVer Topology version.
+ * @return Affinity node to get key from.
+ */
+ private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ if (!canRemap) {
+ List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+ for (ClusterNode node : affNodes) {
+ if (cctx.discovery().alive(node))
+ return node;
+ }
+
+ return null;
+ }
+ else
+ return cctx.affinity().primary(key, topVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ if (!processResponse(nodeId))
+ return false;
+
+ if (canRemap) {
+ final AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(
+ Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+
+ cctx.affinity().affinityReadyFuture(updTopVer).listen(
+ new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ try {
+ fut.get();
+
+ remap(updTopVer);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ else
+ remap(topVer);
+
+ return true;
+ }
+
+ /**
+ * @param topVer Topology version.
+ */
+ private void remap(final AffinityTopologyVersion topVer) {
+ cctx.closures().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ map(topVer);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(Object res, Throwable err) {
+ if (super.onDone(res, err)) {
+ // Don't forget to clean up.
+ if (trackable)
+ cctx.mvcc().removeFuture(futId);
+
+ cctx.dht().sendTtlUpdateRequest(expiryPlc);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return trackable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridPartitionedSingleGetFuture.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7f9edb2..75f8c2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -65,11 +65,14 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -242,6 +245,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
+ ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
+ @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) {
+ processNearSingleGetRequest(nodeId, req);
+ }
+ });
+
ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
@Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
processNearAtomicUpdateRequest(nodeId, req);
@@ -279,6 +288,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
processNearGetResponse(nodeId, res);
}
});
+
+ ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
+ @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) {
+ processNearSingleGetResponse(nodeId, res);
+ }
+ });
}
}
@@ -301,6 +316,45 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<V> getAsync(final K key,
+ final boolean forcePrimary,
+ final boolean skipTx,
+ @Nullable UUID subjId,
+ final String taskName,
+ final boolean deserializePortable,
+ final boolean skipVals,
+ final boolean canRemap) {
+ ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ if (keyCheck)
+ validateCacheKey(key);
+
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ subjId = ctx.subjectIdPerCall(null, opCtx);
+
+ final UUID subjId0 = subjId;
+
+ final ExpiryPolicy expiryPlc = skipVals ? null : opCtx != null ? opCtx.expiry() : null;
+
+ final boolean skipStore = opCtx != null && opCtx.skipStore();
+
+ return asyncOp(new CO<IgniteInternalFuture<V>>() {
+ @Override public IgniteInternalFuture<V> apply() {
+ return getAsync0(ctx.toCacheKeyObject(key),
+ forcePrimary,
+ subjId0,
+ taskName,
+ deserializePortable,
+ expiryPlc,
+ skipVals,
+ skipStore,
+ canRemap);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
@Nullable final Collection<? extends K> keys,
final boolean forcePrimary,
@@ -914,9 +968,57 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * Entry point to all public API single get methods.
+ *
+ * @param key Key.
+ * @param forcePrimary Force primary flag.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable flag.
+ * @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
+ * @param skipStore Skip store flag.
+ * @param canRemap Can remap flag.
+ * @return Get future.
+ */
+ private IgniteInternalFuture<V> getAsync0(KeyCacheObject key,
+ boolean forcePrimary,
+ UUID subjId,
+ String taskName,
+ boolean deserializePortable,
+ @Nullable ExpiryPolicy expiryPlc,
+ boolean skipVals,
+ boolean skipStore,
+ boolean canRemap
+ ) {
+ AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
+ ctx.shared().exchange().readyAffinityVersion();
+
+ IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
+
+ GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
+ key,
+ topVer,
+ !skipStore,
+ forcePrimary,
+ subjId,
+ taskName,
+ deserializePortable,
+ expiry,
+ skipVals,
+ canRemap,
+ false,
+ false);
+
+ fut.init();
+
+ return (IgniteInternalFuture<V>)fut;
+ }
+
+ /**
* Entry point to all public API get methods.
*
- * @param keys Keys to remove.
+ * @param keys Keys.
* @param forcePrimary Force primary flag.
* @param subjId Subject ID.
* @param taskName Task name.
@@ -942,7 +1044,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
// Optimisation: try to resolve value locally and escape 'get future' creation.
- if (!forcePrimary) {
+ if (!forcePrimary && ctx.affinityNode()) {
Map<K, V> locVals = U.newHashMap(keys.size());
boolean success = true;
@@ -2409,27 +2511,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param nodeId Sender node ID.
- * @param res Near get response.
- */
- private void processNearGetResponse(UUID nodeId, GridNearGetResponse res) {
- if (log.isDebugEnabled())
- log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
-
- GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
- res.version(), res.futureId());
-
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
-
- return;
- }
-
- fut.onResult(nodeId, res);
- }
-
- /**
- * @param nodeId Sender node ID.
* @param req Near atomic update request.
*/
private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4ace5c4..c34dcfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
@@ -48,7 +47,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -67,39 +65,42 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
protected static IgniteLogger log;
/** Cache context. */
- private GridCacheContext cctx;
+ private final GridCacheContext cctx;
/** Future version. */
- private GridCacheVersion futVer;
+ private final GridCacheVersion futVer;
/** Write version. */
- private GridCacheVersion writeVer;
+ private final GridCacheVersion writeVer;
/** Force transform backup flag. */
private boolean forceTransformBackups;
/** Completion callback. */
@GridToStringExclude
- private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+ private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
/** Mappings. */
@GridToStringInclude
- private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+ private final Map<UUID, GridDhtAtomicUpdateRequest> mappings;
/** Entries with readers. */
private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
/** Update request. */
- private GridNearAtomicUpdateRequest updateReq;
+ private final GridNearAtomicUpdateRequest updateReq;
/** Update response. */
- private GridNearAtomicUpdateResponse updateRes;
+ private final GridNearAtomicUpdateResponse updateRes;
/** Future keys. */
- private Collection<KeyCacheObject> keys;
+ private final Collection<KeyCacheObject> keys;
/** */
- private boolean waitForExchange;
+ private final boolean waitForExchange;
+
+ /** Response count. */
+ private volatile int resCnt;
/**
* @param cctx Cache context.
@@ -128,6 +129,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
keys = new ArrayList<>(updateReq.keys().size());
+ mappings = U.newHashMap(updateReq.keys().size());
boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
@@ -145,22 +147,37 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
}
/** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
if (log.isDebugEnabled())
log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
+ return registerResponse(nodeId);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return {@code True} if request found.
+ */
+ private boolean registerResponse(UUID nodeId) {
+ int resCnt0;
+
GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
if (req != null) {
- // Remove only after added keys to failed set.
- mappings.remove(nodeId);
+ synchronized (this) {
+ if (req.onResponse()) {
+ resCnt0 = resCnt;
+
+ resCnt0 += 1;
+
+ resCnt = resCnt0;
+ }
+ else
+ return false;
+ }
- checkComplete();
+ if (resCnt0 == mappings.size())
+ onDone();
return true;
}
@@ -343,18 +360,18 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
U.warn(log, "Failed to send update request to backup node because it left grid: " +
req.nodeId());
- mappings.remove(req.nodeId());
+ registerResponse(req.nodeId());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
+ req.nodeId(), e);
- mappings.remove(req.nodeId());
+ registerResponse(req.nodeId());
}
}
}
-
- checkComplete();
+ else
+ onDone();
// Send response right away if no ACKs from backup is required.
// Backups will send ACKs anyway, future will be completed after all backups have replied.
@@ -389,9 +406,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
}
}
- mappings.remove(nodeId);
-
- checkComplete();
+ registerResponse(nodeId);
}
/**
@@ -403,22 +418,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
if (log.isDebugEnabled())
log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
- mappings.remove(nodeId);
-
- checkComplete();
- }
-
- /**
- * Checks if all required responses are received.
- */
- private void checkComplete() {
- // Always wait for replies from all backups.
- if (mappings.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Completing DHT atomic update future: " + this);
-
- onDone();
- }
+ registerResponse(nodeId);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index e55cac9..1219f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -139,6 +139,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** Task name hash. */
private int taskNameHash;
+ /** On response flag. Access should be synced on future. */
+ @GridDirectTransient
+ private boolean onRes;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -527,6 +531,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
}
/**
+ * @return {@code True} if on response flag changed.
+ */
+ public boolean onResponse() {
+ return !onRes && (onRes = true);
+ }
+
+ /**
* @return Optional arguments for entry processor.
*/
@Nullable public Object[] invokeArguments() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ae662c8..a786803 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -238,11 +238,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return state.futureVersion();
}
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- throw new UnsupportedOperationException();
- }
-
/**
* @return {@code True} if this future should block partition map exchange.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7131aa5..47b7aea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -53,8 +53,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
@@ -67,6 +69,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -138,7 +141,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
@Override public void apply(UUID nodeId, GridNearGetResponse res) {
- processGetResponse(nodeId, res);
+ processNearGetResponse(nodeId, res);
+ }
+ });
+
+ ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
+ @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) {
+ processNearSingleGetResponse(nodeId, res);
}
});
@@ -185,6 +194,80 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
/** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<V> getAsync(final K key,
+ boolean forcePrimary,
+ boolean skipTx,
+ @Nullable UUID subjId,
+ String taskName,
+ final boolean deserializePortable,
+ final boolean skipVals,
+ boolean canRemap) {
+ ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ if (keyCheck)
+ validateCacheKey(key);
+
+ IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+
+ final CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ if (tx != null && !tx.implicit() && !skipTx) {
+ return asyncOp(tx, new AsyncOp<V>() {
+ @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx,
+ Collections.singleton(ctx.toCacheKeyObject(key)),
+ deserializePortable,
+ skipVals,
+ false,
+ opCtx != null && opCtx.skipStore());
+
+ return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>, V>() {
+ @SuppressWarnings("unchecked")
+ @Override public V applyx(IgniteInternalFuture<Map<Object, Object>> e)
+ throws IgniteCheckedException {
+ Map<Object, Object> map = e.get();
+
+ assert map.isEmpty() || map.size() == 1 : map.size();
+
+ if (skipVals) {
+ Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map);
+
+ return (V)(val);
+ }
+
+ return (V)map.get(key);
+ }
+ });
+ }
+ });
+ }
+
+ AffinityTopologyVersion topVer = tx == null ?
+ (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
+ tx.topologyVersion();
+
+ subjId = ctx.subjectIdPerCall(subjId, opCtx);
+
+ GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
+ ctx.toCacheKeyObject(key),
+ topVer,
+ opCtx == null || !opCtx.skipStore(),
+ forcePrimary,
+ subjId,
+ taskName,
+ deserializePortable,
+ skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
+ skipVals,
+ canRemap,
+ /*needVer*/false,
+ /*keepCacheObjects*/false);
+
+ fut.init();
+
+ return (IgniteInternalFuture<V>)fut;
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
@Nullable final Collection<? extends K> keys,
boolean forcePrimary,
@@ -290,6 +373,54 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
/**
+ * @param key Key to load.
+ * @param readThrough Read through flag.
+ * @param forcePrimary Force get from primary node flag.
+ * @param topVer Topology version.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable flag.
+ * @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
+ * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+ * @param needVer If {@code true} returns values as tuples containing value and version.
+ * @param keepCacheObj Keep cache objects flag.
+ * @return Load future.
+ */
+ public final IgniteInternalFuture<Object> loadAsync(
+ KeyCacheObject key,
+ boolean readThrough,
+ boolean forcePrimary,
+ AffinityTopologyVersion topVer,
+ @Nullable UUID subjId,
+ String taskName,
+ boolean deserializePortable,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean skipVals,
+ boolean canRemap,
+ boolean needVer,
+ boolean keepCacheObj
+ ) {
+ GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
+ ctx.toCacheKeyObject(key),
+ topVer,
+ readThrough,
+ forcePrimary,
+ subjId,
+ taskName,
+ deserializePortable,
+ expiryPlc,
+ skipVals,
+ canRemap,
+ needVer,
+ keepCacheObj);
+
+ fut.init();
+
+ return fut;
+ }
+
+ /**
* @param keys Keys to load.
* @param readThrough Read through flag.
* @param forcePrimary Force get from primary node flag.
@@ -299,9 +430,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param deserializePortable Deserialize portable flag.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
- * @return Loaded values.
+ * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+ * @param needVer If {@code true} returns values as tuples containing value and version.
+ * @param keepCacheObj Keep cache objects flag.
+ * @return Load future.
*/
- public IgniteInternalFuture<Map<K, V>> loadAsync(
+ public final IgniteInternalFuture<Map<K, V>> loadAsync(
@Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
boolean forcePrimary,
@@ -931,24 +1065,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
/**
- * @param nodeId Sender ID.
- * @param res Response.
- */
- private void processGetResponse(UUID nodeId, GridNearGetResponse res) {
- GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
- res.version(), res.futureId());
-
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
-
- return;
- }
-
- fut.onResult(nodeId, res);
- }
-
- /**
* @param nodeId Node ID.
* @param res Response.
*/
@@ -957,7 +1073,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
assert res != null;
GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc().
- <Boolean>future(res.version(), res.futureId());
+ <Boolean>mvccFuture(res.version(), res.futureId());
if (fut != null)
fut.onResult(nodeId, res);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index abeb509..8245d88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -35,10 +35,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -78,7 +79,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
* Colocated cache lock future.
*/
public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
- implements GridCacheFuture<Boolean> {
+ implements GridCacheMvccFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -198,25 +199,16 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
valMap = new ConcurrentHashMap8<>(keys.size(), 1f);
}
- /**
- * @return Participating nodes.
- */
- @Override public Collection<? extends ClusterNode> nodes() {
- return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return lockVer;
}
+ /** {@inheritDoc} */
+ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
+ return false;
+ }
+
/**
* @return Future ID.
*/
@@ -538,7 +530,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
log.debug("Completing future: " + this);
// Clean up.
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeMvccFuture(this);
if (timeoutObj != null)
cctx.time().removeTimeoutObject(timeoutObj);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
index 1559a91..c14621a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
@@ -53,7 +53,7 @@ public class CacheVersionedValue implements Message {
* @param val Cache value.
* @param ver Cache version.
*/
- CacheVersionedValue(CacheObject val, GridCacheVersion ver) {
+ public CacheVersionedValue(CacheObject val, GridCacheVersion ver) {
this.val = val;
this.ver = ver;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 3c3527a..eb0b637 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheValueCollection;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -292,8 +293,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
* @param res Response.
*/
protected void processGetResponse(UUID nodeId, GridNearGetResponse res) {
- GridNearGetFuture<K, V> fut = (GridNearGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
- res.version(), res.futureId());
+ CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ae1d43c..dfaa44e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -170,24 +170,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return ver;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return
- F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<Map<K, V>>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<Map<K, V>> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
boolean found = false;
@@ -227,7 +209,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
if (super.onDone(res, err)) {
// Don't forget to clean up.
if (trackable)
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeFuture(futId);
cache().dht().sendTtlUpdateRequest(expiryPlc);
@@ -343,7 +325,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
if (!trackable) {
trackable = true;
- cctx.mvcc().addFuture(this);
+ cctx.mvcc().addFuture(this, futId);
}
MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer);
@@ -386,6 +368,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
* @param saved Reserved near cache entries.
* @return Map.
*/
+ @SuppressWarnings("unchecked")
private Map<KeyCacheObject, GridNearCacheEntry> map(
KeyCacheObject key,
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
@@ -538,11 +521,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
else {
K key0 = key.value(cctx.cacheObjectContext(), true);
- V val0 = v.value(cctx.cacheObjectContext(), true);
-
- val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable);
+ V val0;
+
+ if (!skipVals) {
+ val0 = v.value(cctx.cacheObjectContext(), true);
+ val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
+ }
+ else
+ val0 = (V)Boolean.TRUE;
+
add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
}
}
@@ -618,28 +607,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
/**
- * Affinity node to send get request to.
- *
- * @param key Key to get.
- * @param topVer Topology version.
- * @return Affinity node to get key from.
- */
- private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
- if (!canRemap) {
- List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
- for (ClusterNode node : affNodes) {
- if (cctx.discovery().alive(node))
- return node;
- }
-
- return null;
- }
- else
- return cctx.affinity().primary(key, topVer);
- }
-
- /**
* @return Near cache.
*/
private GridNearCacheAdapter<K, V> cache() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 8482217..6d60298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -133,7 +133,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
) {
assert futId != null;
assert miniId != null;
- assert ver != null;
assert keys != null;
this.cacheId = cacheId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index fc06ab1..15a791f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -100,8 +100,6 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
boolean addDepInfo
) {
assert futId != null;
- assert miniId != null;
- assert ver != null;
this.cacheId = cacheId;
this.futId = futId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 9c3701f..76f2fbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -209,20 +209,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
valMap = new ConcurrentHashMap8<>(keys.size(), 1f);
}
- /**
- * @return Participating nodes.
- */
- @Override public Collection<? extends ClusterNode> nodes() {
- return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return lockVer;
@@ -672,7 +658,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
log.debug("Completing future: " + this);
// Clean up.
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeMvccFuture(this);
if (timeoutObj != null)
cctx.time().removeTimeoutObject(timeoutObj);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 1569b14..770c47a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -73,8 +73,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
/**
*
*/
-public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter
- implements GridCacheMvccFuture<IgniteInternalTx> {
+public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
/** */
public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0");
@@ -148,18 +147,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
boolean found = false;
@@ -287,7 +274,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
tx.setRollbackOnly();
// Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeMvccFuture(this);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 82e3868..eaf476c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
@@ -54,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;
@@ -66,8 +64,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
/**
*
*/
-public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter
- implements GridCacheMvccFuture<IgniteInternalTx> {
+public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
/** */
@GridToStringInclude
private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
@@ -100,18 +97,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
/** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
boolean found = false;
@@ -261,7 +246,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (super.onDone(tx, err0)) {
// Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeMvccFuture(this);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 103105e..ffe5373 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -28,6 +28,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
@@ -42,7 +44,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -68,15 +69,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
/** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- return ((MiniFuture)f).node();
- }
- });
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
boolean found = false;
@@ -280,6 +272,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
/** {@inheritDoc} */
+ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) {
if (err != null)
this.err.compareAndSet(null, err);
@@ -290,7 +287,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
tx.state(PREPARED);
if (super.onDone(tx, err)) {
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeMvccFuture(this);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
new file mode 100644
index 0000000..a506007
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
+
+/**
+ *
+ */
+public class GridNearSingleGetRequest extends GridCacheMessage implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final int READ_THROUGH_FLAG_MASK = 0x01;
+
+ /** */
+ public static final int SKIP_VALS_FLAG_MASK = 0x02;
+
+ /** */
+ public static final int ADD_READER_FLAG_MASK = 0x04;
+
+ /** */
+ public static final int NEED_VER_FLAG_MASK = 0x08;
+
+ /** */
+ public static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** */
+ private KeyCacheObject key;
+
+ /** Flags. */
+ private byte flags;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name hash. */
+ private int taskNameHash;
+
+ /** TTL for read operation. */
+ private long accessTtl;
+
+ /**
+ * Empty constructor required for {@link Message}.
+ */
+ public GridNearSingleGetRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param futId Future ID.
+ * @param key Key.
+ * @param readThrough Read through flag.
+ * @param skipVals Skip values flag. When false, only boolean values will be returned indicating whether
+ * cache entry has a value.
+ * @param topVer Topology version.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash.
+ * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged.
+ * @param addReader Add reader flag.
+ * @param needVer {@code True} if entry version is needed.
+ * @param addDepInfo Deployment info.
+ */
+ public GridNearSingleGetRequest(
+ int cacheId,
+ IgniteUuid futId,
+ KeyCacheObject key,
+ boolean readThrough,
+ @NotNull AffinityTopologyVersion topVer,
+ UUID subjId,
+ int taskNameHash,
+ long accessTtl,
+ boolean skipVals,
+ boolean addReader,
+ boolean needVer,
+ boolean addDepInfo
+ ) {
+ assert futId != null;
+ assert key != null;
+
+ this.cacheId = cacheId;
+ this.futId = futId;
+ this.key = key;
+ this.topVer = topVer;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.accessTtl = accessTtl;
+ this.addDepInfo = addDepInfo;
+
+ if (readThrough)
+ flags = (byte)(flags | READ_THROUGH_FLAG_MASK);
+
+ if (skipVals)
+ flags = (byte)(flags | SKIP_VALS_FLAG_MASK);
+
+ if (addReader)
+ flags = (byte)(flags | ADD_READER_FLAG_MASK);
+
+ if (needVer)
+ flags = (byte)(flags | NEED_VER_FLAG_MASK);
+ }
+
+ /**
+ * @return Key.
+ */
+ public KeyCacheObject key() {
+ return key;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Subject ID.
+ */
+ public UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * Gets task name hash.
+ *
+ * @return Task name hash.
+ */
+ public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return New TTL to set after entry is accessed, -1 to leave unchanged.
+ */
+ public long accessTtl() {
+ return accessTtl;
+ }
+
+ /**
+ * @return Read through flag.
+ */
+ public boolean readThrough() {
+ return (flags & SKIP_STORE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @return Read through flag.
+ */
+ public boolean skipValues() {
+ return (flags & SKIP_VALS_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @return Add reader flag.
+ */
+ public boolean addReader() {
+ return (flags & ADD_READER_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @return {@code True} if entry version is needed.
+ */
+ public boolean needVersion() {
+ return (flags & NEED_VER_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @return {@code True} if full entry information is needed.
+ */
+ public boolean needEntryInfo() {
+ return (flags & NEED_ENTRY_INFO_FLAG_MASK) != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ assert key != null;
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObject(key, cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ assert key != null;
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ accessTtl = reader.readLong("accessTtl");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ key = reader.readMessage("key");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearSingleGetRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("accessTtl", accessTtl))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeMessage("key", key))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeUuid("subjId", subjId))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 116;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearSingleGetRequest.class, this);
+ }
+}