You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/11/20 02:49:18 UTC
[08/22] ignite git commit: Optimization for single key cache 'get'
operation.
Optimization for single key cache 'get' operation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f103067
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f103067
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f103067
Branch: refs/heads/ignite-direct-marsh-opt
Commit: 1f1030670a6e7f9fbad1d939301c884f29b7885a
Parents: ba1d563
Author: sboikov <sb...@gridgain.com>
Authored: Thu Nov 19 17:29:04 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 19 17:29:04 2015 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 12 +
.../processors/cache/GridCacheAdapter.java | 103 ++-
.../processors/cache/GridCacheAtomicFuture.java | 6 +
.../processors/cache/GridCacheFuture.java | 13 -
.../processors/cache/GridCacheIoManager.java | 50 +-
.../processors/cache/GridCacheMessage.java | 20 +-
.../processors/cache/GridCacheMvccFuture.java | 7 +
.../processors/cache/GridCacheMvccManager.java | 108 +--
.../distributed/GridCacheTxRecoveryFuture.java | 13 +-
.../dht/CacheDistributedGetFutureAdapter.java | 27 +-
.../cache/distributed/dht/CacheGetFuture.java | 32 +
.../distributed/dht/GridDhtCacheAdapter.java | 141 ++++
.../distributed/dht/GridDhtLockFuture.java | 16 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 9 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 24 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 17 +-
.../dht/GridPartitionedGetFuture.java | 69 +-
.../dht/GridPartitionedSingleGetFuture.java | 697 +++++++++++++++++++
.../dht/atomic/GridDhtAtomicCache.java | 127 +++-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 84 +--
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 11 +
.../dht/atomic/GridNearAtomicUpdateFuture.java | 5 -
.../dht/colocated/GridDhtColocatedCache.java | 160 ++++-
.../colocated/GridDhtColocatedLockFuture.java | 26 +-
.../distributed/near/CacheVersionedValue.java | 2 +-
.../distributed/near/GridNearCacheAdapter.java | 4 +-
.../distributed/near/GridNearGetFuture.java | 57 +-
.../distributed/near/GridNearGetRequest.java | 1 -
.../distributed/near/GridNearGetResponse.java | 2 -
.../distributed/near/GridNearLockFuture.java | 16 +-
...arOptimisticSerializableTxPrepareFuture.java | 17 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 19 +-
.../GridNearPessimisticTxPrepareFuture.java | 19 +-
.../near/GridNearSingleGetRequest.java | 396 +++++++++++
.../near/GridNearSingleGetResponse.java | 321 +++++++++
.../near/GridNearTransactionalCache.java | 2 +-
.../near/GridNearTxFinishFuture.java | 24 +-
.../cache/distributed/near/GridNearTxLocal.java | 149 ++--
.../near/GridNearTxPrepareFutureAdapter.java | 6 +-
.../processors/cache/local/GridLocalCache.java | 4 +-
.../cache/local/GridLocalLockFuture.java | 5 -
.../cache/transactions/IgniteTxHandler.java | 19 +-
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 2 +-
.../IgniteClientReconnectCacheTest.java | 11 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 75 ++
.../GridCacheConcurrentTxMultiNodeTest.java | 15 -
.../cache/GridCachePartitionedGetSelfTest.java | 3 +-
.../IgniteCacheAbstractStopBusySelfTest.java | 27 +-
.../IgniteCacheP2pUnmarshallingErrorTest.java | 184 +++--
.../CacheGetFutureHangsSelfTest.java | 6 +
.../GridCacheAbstractNodeRestartSelfTest.java | 2 +
.../IgniteCacheSingleGetMessageTest.java | 357 ++++++++++
.../GridCacheReplicatedMetricsSelfTest.java | 9 -
.../testsuites/IgniteCacheTestSuite4.java | 3 +
56 files changed, 2908 insertions(+), 632 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 2503eda..3548aac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -83,6 +83,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
@@ -696,6 +698,16 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 116:
+ msg = new GridNearSingleGetRequest();
+
+ break;
+
+ case 117:
+ msg = new GridNearSingleGetResponse();
+
+ break;
+
// [-3..114] - this
// [120..123] - DR
// [-4..-22] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cbb7486..562a0eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -599,11 +599,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> containsKeyAsync(K key) {
+ @Override public final IgniteInternalFuture<Boolean> containsKeyAsync(K key) {
A.notNull(key, "key");
- return getAllAsync(
- Collections.singletonList(key),
+ return (IgniteInternalFuture)getAsync(
+ key,
/*force primary*/false,
/*skip tx*/false,
/*subj id*/null,
@@ -611,15 +611,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*deserialize portable*/false,
/*skip values*/true,
/*can remap*/true
- ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
- @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
- Map<K, V> map = fut.get();
-
- assert map.isEmpty() || map.size() == 1 : map.size();
-
- return map.isEmpty() ? false : map.values().iterator().next() != null;
- }
- });
+ );
}
/** {@inheritDoc} */
@@ -1473,6 +1465,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * @param key Key.
+ * @param forcePrimary Force primary.
+ * @param skipTx Skip tx.
+ * @param subjId Subj Id.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable.
+ * @param skipVals Skip values.
+ * @param canRemap Can remap flag.
+ * @return Future for the get operation.
+ */
+ protected IgniteInternalFuture<V> getAsync(
+ final K key,
+ boolean forcePrimary,
+ boolean skipTx,
+ @Nullable UUID subjId,
+ String taskName,
+ boolean deserializePortable,
+ final boolean skipVals,
+ boolean canRemap
+ ) {
+ return getAllAsync(Collections.singletonList(key),
+ forcePrimary,
+ skipTx,
+ subjId,
+ taskName,
+ deserializePortable,
+ skipVals,
+ canRemap).chain(
+ new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
+ @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ Map<K, V> map = e.get();
+
+ assert map.isEmpty() || map.size() == 1 : map.size();
+
+ if (skipVals) {
+ Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map);
+
+ return (V)(val);
+ }
+
+ return map.get(key);
+ }
+ });
+ }
+
+ /**
* @param keys Keys.
* @param forcePrimary Force primary.
* @param skipTx Skip tx.
@@ -1524,7 +1562,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Future for the get operation.
* @see GridCacheAdapter#getAllAsync(Collection)
*/
- public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
+ public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
boolean readThrough,
boolean checkTx,
@Nullable final UUID subjId,
@@ -1605,11 +1643,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();
+ final boolean needEntry = storeEnabled || ctx.isSwapOrOffheapEnabled();
+
Map<KeyCacheObject, GridCacheVersion> misses = null;
for (KeyCacheObject key : keys) {
while (true) {
- GridCacheEntryEx entry = entryEx(key);
+ GridCacheEntryEx entry = needEntry ? entryEx(key) : peekEx(key);
+
+ if (entry == null) {
+ if (!skipVals && ctx.config().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(false);
+
+ break;
+ }
try {
T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null,
@@ -4389,11 +4436,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
@Nullable public V get(K key, boolean deserializePortable)
throws IgniteCheckedException {
- Map<K, V> map = getAllAsync(F.asList(key), deserializePortable).get();
-
- assert map.isEmpty() || map.size() == 1 : map.size();
-
- return map.get(key);
+ return getAsync(key, deserializePortable).get();
}
/**
@@ -4409,16 +4452,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return new GridFinishedFuture<>(e);
}
- return getAllAsync(Collections.singletonList(key), deserializePortable).chain(
- new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
- Map<K, V> map = e.get();
-
- assert map.isEmpty() || map.size() == 1 : map.size();
+ String taskName = ctx.kernalContext().job().currentTaskName();
- return map.get(key);
- }
- });
+ return getAsync(key,
+ !ctx.config().isReadFromBackup(),
+ /*skip tx*/false,
+ null,
+ taskName,
+ deserializePortable,
+ false,
+ /*can remap*/true);
}
/**
@@ -4445,10 +4488,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(keys,
!ctx.config().isReadFromBackup(),
/*skip tx*/false,
- null,
+ /*subject id*/null,
taskName,
deserializePortable,
- false,
+ /*skip vals*/false,
/*can remap*/true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index be35c5c..359909e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -20,12 +20,18 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
* Update future for atomic cache.
*/
public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
/**
+ * @return Future version.
+ */
+ public GridCacheVersion version();
+
+ /**
* Gets future that will be completed when it is safe when update is finished on the given version of topology.
*
* @param topVer Topology version to finish.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
index caa3d3f..8bf8d40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
@@ -17,11 +17,8 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.Collection;
import java.util.UUID;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.lang.IgniteUuid;
/**
@@ -34,16 +31,6 @@ public interface GridCacheFuture<R> extends IgniteInternalFuture<R> {
public IgniteUuid futureId();
/**
- * @return Future version.
- */
- public GridCacheVersion version();
-
- /**
- * @return Involved nodes.
- */
- public Collection<? extends ClusterNode> nodes();
-
- /**
* Callback for when node left.
*
* @param nodeId Left node ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2334780..9afbca8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -34,22 +34,24 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
@@ -437,7 +439,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
case 50: {
GridNearGetResponse res = (GridNearGetResponse)msg;
- GridCacheFuture fut = ctx.mvcc().future(res.version(), res.futureId());
+ CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -448,10 +450,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(res.classError());
- if (fut instanceof GridNearGetFuture)
- ((GridNearGetFuture)fut).onResult(nodeId, res);
- else
- ((GridPartitionedGetFuture)fut).onResult(nodeId, res);
+ fut.onResult(nodeId, res);
}
break;
@@ -521,6 +520,43 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case 116: {
+ GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
+
+ GridNearSingleGetResponse res = new GridNearSingleGetResponse(
+ ctx.cacheId(),
+ req.futureId(),
+ req.topologyVersion(),
+ null,
+ false,
+ req.deployInfo() != null);
+
+ res.error(req.classError());
+
+ sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ }
+
+ break;
+
+ case 117: {
+ GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
+
+ GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
+
+ return;
+ }
+
+ res.error(res.classError());
+
+ fut.onResult(nodeId, res);
+ }
+
+ break;
+
default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]");
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index bdd2118..61136bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -499,15 +499,21 @@ public abstract class GridCacheMessage implements Message {
int size = col.size();
- for (int i = 0 ; i < size; i++) {
- CacheObject obj = col.get(i);
+ for (int i = 0 ; i < size; i++)
+ prepareMarshalCacheObject(col.get(i), ctx);
+ }
- if (obj != null) {
- obj.prepareMarshal(ctx.cacheObjectContext());
+ /**
+ * @param obj Object.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void prepareMarshalCacheObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException {
+ if (obj != null) {
+ obj.prepareMarshal(ctx.cacheObjectContext());
- if (addDepInfo)
- prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
- }
+ if (addDepInfo)
+ prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
index 67c1330..080a6f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
@@ -17,11 +17,18 @@
package org.apache.ignite.internal.processors.cache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
/**
* Distributed future aware of MVCC locking.
*/
public interface GridCacheMvccFuture<T> extends GridCacheFuture<T> {
/**
+ * @return Future version.
+ */
+ public GridCacheVersion version();
+
+ /**
* @param entry Entry which received new owner.
* @param owner Owner.
* @return {@code True} if future cares about this entry.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 8562f37..9104acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -31,7 +30,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -101,12 +99,15 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/** Active futures mapped by version ID. */
@GridToStringExclude
- private final ConcurrentMap<GridCacheVersion, Collection<GridCacheFuture<?>>> futs = newMap();
+ private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
/** Pending atomic futures. */
private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts =
new ConcurrentHashMap8<>();
+ /** */
+ private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>();
+
/** Near to DHT version mapping. */
private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
@@ -137,12 +138,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
prev + ']');
if (owner != null && (owner.local() || owner.nearLocal())) {
- Collection<? extends GridCacheFuture> futCol = futs.get(owner.version());
+ Collection<GridCacheMvccFuture<?>> futCol = mvccFuts.get(owner.version());
if (futCol != null) {
synchronized (futCol) {
- for (GridCacheFuture fut : futCol) {
- if (fut instanceof GridCacheMvccFuture && !fut.isDone()) {
+ for (GridCacheMvccFuture<?> fut : futCol) {
+ if (!fut.isDone()) {
GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut;
// Since this method is called outside of entry synchronization,
@@ -206,18 +207,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
for (GridCacheFuture<?> fut : activeFutures())
fut.onNodeLeft(discoEvt.eventNode().id());
- for (IgniteInternalFuture<?> fut : atomicFuts.values()) {
- if (fut instanceof GridCacheFuture) {
- GridCacheFuture cacheFut = (GridCacheFuture)fut;
+ for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) {
+ cacheFut.onNodeLeft(discoEvt.eventNode().id());
- cacheFut.onNodeLeft(discoEvt.eventNode().id());
+ if (cacheFut.isCancelled() || cacheFut.isDone()) {
+ GridCacheVersion futVer = cacheFut.version();
- if (cacheFut.isCancelled() || cacheFut.isDone()) {
- GridCacheVersion futVer = cacheFut.version();
-
- if (futVer != null)
- atomicFuts.remove(futVer, fut);
- }
+ if (futVer != null)
+ atomicFuts.remove(futVer, cacheFut);
}
}
}
@@ -261,12 +258,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
public Collection<GridCacheFuture<?>> activeFutures() {
ArrayList<GridCacheFuture<?>> col = new ArrayList<>();
- for (Collection<GridCacheFuture<?>> verFuts : futs.values()) {
- synchronized (verFuts) {
- col.addAll(verFuts);
+ for (Collection<GridCacheMvccFuture<?>> futs : mvccFuts.values()) {
+ synchronized (futs) {
+ col.addAll(futs);
}
}
+ col.addAll(futs.values());
+
return col;
}
@@ -420,13 +419,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param fut Future.
+ * @param futId Future ID.
+ */
+ public void addFuture(final GridCacheFuture<?> fut, final IgniteUuid futId) {
+ GridCacheFuture<?> old = futs.put(futId, fut);
+
+ assert old == null : old;
+
+ onFutureAdded(fut);
+ }
+
+ /**
* Adds future.
*
* @param fut Future.
* @return {@code True} if added.
*/
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
- public boolean addFuture(final GridCacheFuture<?> fut) {
+ public boolean addFuture(final GridCacheMvccFuture<?> fut) {
if (fut.isDone()) {
fut.markNotTrackable();
@@ -437,10 +448,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
return true;
while (true) {
- Collection<GridCacheFuture<?>> old = futs.get(fut.version());
+ Collection<GridCacheMvccFuture<?>> old = mvccFuts.get(fut.version());
if (old == null) {
- Collection<GridCacheFuture<?>> col = new HashSet<GridCacheFuture<?>>(U.capacity(4), 0.75f) {
+ Collection<GridCacheMvccFuture<?>> col = new HashSet<GridCacheMvccFuture<?>>(U.capacity(4), 0.75f) {
{
// Make sure that we add future to queue before
// adding queue to the map of futures.
@@ -456,7 +467,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
};
- old = futs.putIfAbsent(fut.version(), col);
+ old = mvccFuts.putIfAbsent(fut.version(), col);
}
if (old != null) {
@@ -471,7 +482,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
// Future is being removed, so we force-remove here and try again.
if (empty) {
- if (futs.remove(fut.version(), old)) {
+ if (mvccFuts.remove(fut.version(), old)) {
if (log.isDebugEnabled())
log.debug("Removed future list from futures map for lock version: " + fut.version());
}
@@ -501,16 +512,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
break;
}
- // Close window in case of node is gone before the future got added to
- // the map of futures.
- for (ClusterNode n : fut.nodes()) {
- if (cctx.discovery().node(n.id()) == null)
- fut.onNodeLeft(n.id());
- }
-
// Just in case if future was completed before it was added.
if (fut.isDone())
- removeFuture(fut);
+ removeMvccFuture(fut);
else
onFutureAdded(fut);
@@ -537,15 +541,22 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param futId Future ID.
+ */
+ public void removeFuture(IgniteUuid futId) {
+ futs.remove(futId);
+ }
+
+ /**
* @param fut Future to remove.
* @return {@code True} if removed.
*/
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
- public boolean removeFuture(GridCacheFuture<?> fut) {
+ public boolean removeMvccFuture(GridCacheMvccFuture<?> fut) {
if (!fut.trackable())
return true;
- Collection<GridCacheFuture<?>> cur = futs.get(fut.version());
+ Collection<GridCacheMvccFuture<?>> cur = mvccFuts.get(fut.version());
if (cur == null)
return false;
@@ -565,7 +576,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
else if (log.isDebugEnabled())
log.debug("Attempted to remove a non-registered future (has it been already removed?): " + fut);
- if (empty && futs.remove(fut.version(), cur))
+ if (empty && mvccFuts.remove(fut.version(), cur))
if (log.isDebugEnabled())
log.debug("Removed future list from futures map for lock version: " + fut.version());
@@ -580,12 +591,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @return Future.
*/
@SuppressWarnings({"unchecked"})
- @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
- Collection<? extends GridCacheFuture> futs = this.futs.get(ver);
+ @Nullable public GridCacheMvccFuture<?> mvccFuture(GridCacheVersion ver, IgniteUuid futId) {
+ Collection<GridCacheMvccFuture<?>> futs = this.mvccFuts.get(ver);
if (futs != null) {
synchronized (futs) {
- for (GridCacheFuture<?> fut : futs) {
+ for (GridCacheMvccFuture<?> fut : futs) {
if (fut.futureId().equals(futId)) {
if (log.isDebugEnabled())
log.debug("Found future in futures map: " + fut);
@@ -603,22 +614,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
- * Gets all futures for given lock version, possibly empty collection.
- *
- * @param ver Version.
- * @return All futures for given lock version.
+ * @param futId Future ID.
+ * @return Found future.
*/
- @SuppressWarnings("unchecked")
- public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) {
- Collection c = futs.get(ver);
-
- if (c == null)
- return Collections.<IgniteInternalFuture<T>>emptyList();
- else {
- synchronized (c) {
- return new ArrayList<>((Collection<IgniteInternalFuture<T>>)c);
- }
- }
+ @Nullable public GridCacheFuture future(IgniteUuid futId) {
+ return futs.get(futId);
}
/**
@@ -913,7 +913,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']');
X.println(">>> rmvLocksSize: " + rmvLocks.sizex());
X.println(">>> lockedSize: " + locked.size());
- X.println(">>> futsSize: " + futs.size());
+ X.println(">>> futsSize: " + (mvccFuts.size() + futs.size()));
X.println(">>> near2dhtSize: " + near2dht.size());
X.println(">>> finishFutsSize: " + finishFuts.sizex());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 01c4867..1648de0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -389,16 +388,6 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return tx.xidVersion();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return nodes.values();
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
for (IgniteInternalFuture<?> fut : futures())
if (isMini(fut)) {
@@ -424,7 +413,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeFuture(futId);
if (err == null) {
assert res != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 721ba4e..245ffc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -18,9 +18,12 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -40,7 +43,7 @@ import static org.apache.ignite.IgniteSystemProperties.getInteger;
*
*/
public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
- implements GridCacheFuture<Map<K, V>> {
+ implements GridCacheFuture<Map<K, V>>, CacheGetFuture {
/** Default max remap count value. */
public static final int DFLT_MAX_REMAP_CNT = 3;
@@ -155,4 +158,26 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
map.put(key, new T2<>(skipVals ? true : val, ver));
}
+
+ /**
+ * Affinity node to send get request to.
+ *
+ * @param key Key to get.
+ * @param topVer Topology version.
+ * @return Affinity node to get key from.
+ */
+ protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ if (!canRemap) {
+ List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+ for (ClusterNode node : affNodes) {
+ if (cctx.discovery().alive(node))
+ return node;
+ }
+
+ return null;
+ }
+ else
+ return cctx.affinity().primary(key, topVer);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java
new file mode 100644
index 0000000..ebe2cff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+
+/**
+ *
+ */
+public interface CacheGetFuture {
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ public void onResult(UUID nodeId, GridNearGetResponse res);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index bdd1140..8537357 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -57,9 +57,12 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -76,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -110,6 +114,46 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
/**
+ * @param nodeId Sender node ID.
+ * @param res Near get response.
+ */
+ protected final void processNearGetResponse(UUID nodeId, GridNearGetResponse res) {
+ if (log.isDebugEnabled())
+ log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
+
+ CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param res Near get response.
+ */
+ protected void processNearSingleGetResponse(UUID nodeId, GridNearSingleGetResponse res) {
+ if (log.isDebugEnabled())
+ log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
+
+ GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
+
+ /**
* @param ctx Context.
*/
protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
@@ -669,6 +713,103 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param nodeId Node ID.
* @param req Get request.
*/
+ protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) {
+ assert ctx.affinityNode();
+
+ long ttl = req.accessTtl();
+
+ final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);
+
+ LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1);
+
+ map.put(req.key(), req.addReader());
+
+ IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut =
+ getDhtAsync(nodeId,
+ req.messageId(),
+ map,
+ req.readThrough(),
+ req.topologyVersion(),
+ req.subjectId(),
+ req.taskNameHash(),
+ expiryPlc,
+ req.skipValues());
+
+ fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
+ @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) {
+ GridNearSingleGetResponse res;
+
+ GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
+ (GridDhtFuture<Collection<GridCacheEntryInfo>>)f;
+
+ try {
+ Collection<GridCacheEntryInfo> entries = fut.get();
+
+ if (F.isEmpty(fut.invalidPartitions())) {
+ GridCacheEntryInfo info = F.first(entries);
+
+ Message res0 = null;
+
+ if (info != null) {
+ if (req.needEntryInfo()) {
+ info.key(null);
+
+ res0 = info;
+ } else if (req.needVersion())
+ res0 = new CacheVersionedValue(info.value(), info.version());
+ else
+ res0 = info.value();
+ }
+
+ res = new GridNearSingleGetResponse(ctx.cacheId(),
+ req.futureId(),
+ req.topologyVersion(),
+ res0,
+ false,
+ req.addDeploymentInfo());
+
+ if (info != null && req.skipValues())
+ res.setContainsValue();
+ }
+ else {
+ res = new GridNearSingleGetResponse(ctx.cacheId(),
+ req.futureId(),
+ ctx.shared().exchange().readyAffinityVersion(),
+ null,
+ true,
+ req.addDeploymentInfo());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed processing get request: " + req, e);
+
+ res = new GridNearSingleGetResponse(ctx.cacheId(),
+ req.futureId(),
+ req.topologyVersion(),
+ null,
+ false,
+ req.addDeploymentInfo());
+
+ res.error(e);
+ }
+
+ try {
+ ctx.io().send(nodeId, res, ctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
+ ",req=" + req + ", res=" + res + ']', e);
+ }
+
+ sendTtlUpdateRequest(expiryPlc);
+ }
+ });
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Get request.
+ */
protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
assert ctx.affinityNode();
assert !req.reload() : req;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 7284fd4..a7978c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -262,20 +262,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
log.debug("Added invalid partition to future [invalidPart=" + invalidPart + ", fut=" + this + ']');
}
- /**
- * @return Participating nodes.
- */
- @Override public Collection<? extends ClusterNode> nodes() {
- return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return lockVer;
@@ -756,7 +742,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
log.debug("Completing future: " + this);
// Clean up.
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeMvccFuture(this);
if (timeoutObj != null)
cctx.time().removeTimeoutObject(timeoutObj);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 35f63e3..2468cf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
@@ -126,6 +127,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
});
+ ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
+ @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) {
+ processNearSingleGetRequest(nodeId, req);
+ }
+ });
+
ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() {
@Override public void apply(UUID nodeId, GridNearLockRequest req) {
processNearLockRequest(nodeId, req);
@@ -566,7 +573,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) {
assert nodeId != null;
assert res != null;
- GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId());
+ GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>mvccFuture(res.version(), res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 992bd66..bb370a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -66,7 +66,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
private GridCacheSharedContext<K, V> cctx;
/** Future ID. */
- private IgniteUuid futId;
+ private final IgniteUuid futId;
/** Transaction. */
@GridToStringExclude
@@ -115,26 +115,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return tx.xidVersion();
- }
-
- /**
- * @return Involved nodes.
- */
- @Override public Collection<? extends ClusterNode> nodes() {
- return
- F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
for (IgniteInternalFuture<?> fut : futures())
if (isMini(fut)) {
@@ -228,7 +208,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
this.tx.sendFinishReply(commit, error());
// Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeFuture(futId);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 2bed931..f344d48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -490,7 +490,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true);
- cctx.mvcc().addFuture(fut);
+ cctx.mvcc().addFuture(fut, fut.futureId());
GridDhtTxPrepareFuture prep = prepFut.get();
@@ -580,7 +580,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false);
- cctx.mvcc().addFuture(fut);
+ cctx.mvcc().addFuture(fut, fut.futureId());
if (prepFut == null) {
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d081c0c..4cb5d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -243,21 +243,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return tx.xidVersion();
}
- /**
- * @return Involved nodes.
- */
- @Override public Collection<? extends ClusterNode> nodes() {
- return
- F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
/** {@inheritDoc} */
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
if (log.isDebugEnabled())
@@ -823,7 +808,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (super.onDone(res, err.get())) {
// Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeMvccFuture(this);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index febe9ba..c3d9836 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -57,10 +56,11 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.*;
+
/**
* Colocated get future.
*/
@@ -71,15 +71,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ /** Dummy version sent to older nodes for backward compatibility, */
+ private static final GridCacheVersion DUMMY_VER = new GridCacheVersion(0, 0, 0, 0);
+
/** Logger. */
private static IgniteLogger log;
/** Topology version. */
private AffinityTopologyVersion topVer;
- /** Version. */
- private GridCacheVersion ver;
-
/**
* @param cctx Context.
* @param keys Keys.
@@ -126,8 +126,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
this.topVer = topVer;
- ver = cctx.versions().next();
-
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
}
@@ -160,25 +158,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return ver;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public Collection<? extends ClusterNode> nodes() {
- return
- F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<Map<K, V>>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<Map<K, V>> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
boolean found = false;
@@ -219,7 +198,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
if (super.onDone(res, err)) {
// Don't forget to clean up.
if (trackable)
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeFuture(futId);
cache().sendTtlUpdateRequest(expiryPlc);
@@ -274,9 +253,11 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
add(new GridFinishedFuture<>(locVals));
if (hasRmtNodes) {
- trackable = true;
+ if (!trackable) {
+ trackable = true;
- cctx.mvcc().addFuture(this);
+ cctx.mvcc().addFuture(this, futId);
+ }
}
// Create mini futures.
@@ -343,7 +324,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
cctx.cacheId(),
futId,
fut.futureId(),
- ver,
+ n.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0 ? null : DUMMY_VER,
mappedKeys,
readThrough,
topVer,
@@ -390,7 +371,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
boolean remote = false;
// Allow to get cached value from the local node.
- boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
+ boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) ||
+ cctx.affinity().primary(cctx.localNode(), key, topVer);
while (true) {
GridCacheEntryEx entry;
@@ -521,28 +503,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
}
/**
- * Finds affinity node to send get request to.
- *
- * @param key Key to get.
- * @param topVer Topology version.
- * @return Affinity node from which the key will be requested.
- */
- private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
- if (!canRemap) {
- List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer);
-
- for (ClusterNode node : nodes) {
- if (cctx.discovery().alive(node))
- return node;
- }
-
- return null;
- }
- else
- return cctx.affinity().primary(key, topVer);
- }
-
- /**
* @param infos Entry infos.
* @return Result map.
*/
@@ -557,7 +517,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
if (needVer)
versionedResult(map, info.key(), info.value(), info.version());
- else
+ else {
cctx.addResult(map,
info.key(),
info.value(),
@@ -565,6 +525,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
keepCacheObjects,
deserializePortable,
false);
+ }
}
return map;