You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/12 11:24:18 UTC
[04/10] ignite git commit: ignite-5932
ignite-5932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af887544
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af887544
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af887544
Branch: refs/heads/ignite-5932
Commit: af887544099dd3e2f427b74a029f601ffddb4471
Parents: 1780062
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 12:30:02 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 12:30:02 2017 +0300
----------------------------------------------------------------------
.../dht/GridPartitionedGetFuture.java | 15 ++++++++--
.../dht/GridPartitionedSingleGetFuture.java | 21 ++++++++++----
.../dht/atomic/GridDhtAtomicCache.java | 6 ++--
.../dht/colocated/GridDhtColocatedCache.java | 30 +++++++++++++-------
.../cache/distributed/near/GridNearTxLocal.java | 23 ++++++++++-----
5 files changed, 68 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 7993d05..7689a4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -78,6 +78,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
private static IgniteLogger log;
/** */
+ protected final MvccCoordinatorVersion mvccVer;
+
+ /** */
private MvccQueryTracker mvccTracker;
/**
@@ -94,6 +97,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
* @param skipVals Skip values flag.
* @param needVer If {@code true} returns values as tuples containing value and version.
* @param keepCacheObjects Keep cache objects flag.
+ * @param mvccVer Mvcc version.
*/
public GridPartitionedGetFuture(
GridCacheContext<K, V> cctx,
@@ -107,7 +111,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
boolean needVer,
- boolean keepCacheObjects
+ boolean keepCacheObjects,
+ @Nullable MvccCoordinatorVersion mvccVer
) {
super(cctx,
keys,
@@ -121,6 +126,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
needVer,
keepCacheObjects,
recovery);
+ assert mvccVer == null || cctx.mvccEnabled();
+
+ this.mvccVer = mvccVer;
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
@@ -133,6 +141,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
if (!cctx.mvccEnabled())
return null;
+ if (mvccVer != null)
+ return mvccVer;
+
MvccCoordinatorVersion ver = mvccTracker.mvccVersion();
assert ver != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + "]";
@@ -158,7 +169,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
}
- if (cctx.mvccEnabled()) {
+ if (cctx.mvccEnabled() && mvccVer == null) {
mvccTracker = new MvccQueryTracker(cctx, canRemap, this);
trackable = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index b34687f..afef744 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,11 +41,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -122,6 +123,9 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
@GridToStringInclude
private ClusterNode node;
+ /** */
+ protected final MvccCoordinatorVersion mvccVer;
+
/**
* @param cctx Context.
* @param key Key.
@@ -149,9 +153,11 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
boolean skipVals,
boolean needVer,
boolean keepCacheObjects,
- boolean recovery
+ boolean recovery,
+ @Nullable MvccCoordinatorVersion mvccVer
) {
assert key != null;
+ assert mvccVer == null || cctx.mvccEnabled();
AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
@@ -176,6 +182,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
this.keepCacheObjects = keepCacheObjects;
this.recovery = recovery;
this.topVer = topVer;
+ this.mvccVer = mvccVer;
futId = IgniteUuid.randomUuid();
@@ -275,6 +282,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
cctx.mvcc().addFuture(this, futId);
}
+ // TODO IGNITE-3478.
GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),
futId.localId(),
key,
@@ -355,7 +363,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
boolean skipEntry = readNoEntry;
if (readNoEntry) {
- CacheDataRow row = cctx.offheap().read(cctx, key); // TODO IGNITE-3478
+ CacheDataRow row = mvccVer != null ? cctx.offheap().mvccRead(cctx, key, mvccVer) :
+ cctx.offheap().read(cctx, key);
if (row != null) {
long expireTime = row.expireTime();
@@ -398,8 +407,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
taskName,
expiryPlc,
true,
- null,
- null); // TODO IGNITE-3478
+ mvccVer,
+ null);
if (res != null) {
v = res.value();
@@ -418,7 +427,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
taskName,
expiryPlc,
true,
- null); // TODO IGNITE-3478
+ mvccVer);
}
colocated.context().evicts().touch(entry, topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 16416cc..d6862fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1385,7 +1385,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
skipVals,
needVer,
false,
- recovery);
+ recovery,
+ null);
fut.init();
@@ -1591,7 +1592,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
skipVals,
needVer,
- false);
+ false,
+ null);
fut.init(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7364cb3..c975edb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
@@ -241,7 +242,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
skipVals,
needVer,
/*keepCacheObjects*/false,
- opCtx != null && opCtx.recovery());
+ opCtx != null && opCtx.recovery(),
+ null);
fut.init();
@@ -319,7 +321,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param needVer Need version.
* @return Loaded values.
*/
- public IgniteInternalFuture<Map<K, V>> loadAsync(
+ private IgniteInternalFuture<Map<K, V>> loadAsync(
@Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
boolean forcePrimary,
@@ -341,7 +343,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc,
skipVals,
needVer,
- false);
+ false,
+ null);
}
/**
@@ -370,7 +373,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean skipVals,
boolean needVer,
boolean keepCacheObj,
- boolean recovery
+ boolean recovery,
+ @Nullable MvccCoordinatorVersion mvccVer
) {
GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
ctx.toCacheKeyObject(key),
@@ -384,7 +388,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
skipVals,
needVer,
keepCacheObj,
- recovery);
+ recovery,
+ mvccVer);
fut.init();
@@ -403,6 +408,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param skipVals Skip values flag.
* @param needVer If {@code true} returns values as tuples containing value and version.
* @param keepCacheObj Keep cache objects flag.
+ * @param mvccVer Mvcc version.
* @return Load future.
*/
public final IgniteInternalFuture<Map<K, V>> loadAsync(
@@ -417,8 +423,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
boolean needVer,
- boolean keepCacheObj
+ boolean keepCacheObj,
+ @Nullable MvccCoordinatorVersion mvccVer
) {
+ assert mvccVer == null || ctx.mvccEnabled();
+
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -426,7 +435,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc = expiryPolicy(null);
// Optimisation: try to resolve value locally and escape 'get future' creation.
- if (!forcePrimary && ctx.affinityNode() && !ctx.mvccEnabled()) {
+ if (!forcePrimary && ctx.affinityNode() && (!ctx.mvccEnabled() || mvccVer != null)) {
try {
Map<K, V> locVals = null;
@@ -499,7 +508,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
taskName,
expiryPlc,
!deserializeBinary,
- null,
+ mvccVer,
null);
if (getRes != null) {
@@ -519,7 +528,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
taskName,
expiryPlc,
!deserializeBinary,
- null);
+ mvccVer);
}
// Entry was not in memory or in swap, so we remove it from cache.
@@ -602,7 +611,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc,
skipVals,
needVer,
- keepCacheObj);
+ keepCacheObj,
+ mvccVer);
fut.init(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c8dfc9f..08f20de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1659,6 +1659,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
}
+ private MvccCoordinatorVersion mvccReadVersion(GridCacheContext cctx) {
+ if (!cctx.mvccEnabled() || mvccTracker == null)
+ return null;
+
+ return mvccTracker.mvccVersion();
+ }
+
/**
* @param cacheCtx Cache context.
* @param keys Keys to get.
@@ -1830,8 +1837,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
null,
txEntry.keepBinary(),
- null,
- null); // TODO IGNITE-3478
+ null, // TODO IGNITE-3478
+ null);
if (getRes != null) {
val = getRes.value();
@@ -2214,8 +2221,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
accessPlc,
!deserializeBinary,
- null,
- null) : null; // TODO IGNITE-3478
+ mvccReadVersion(cacheCtx), // TODO IGNITE-3478
+ null) : null;
if (getRes != null) {
val = getRes.value();
@@ -2234,7 +2241,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
accessPlc,
!deserializeBinary,
- null); // TODO IGNITE-3478
+ mvccReadVersion(cacheCtx)); // TODO IGNITE-3478
}
if (val != null) {
@@ -2572,7 +2579,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
skipVals,
needVer,
/*keepCacheObject*/true,
- recovery
+ recovery,
+ mvccReadVersion(cacheCtx)
).chain(new C1<IgniteInternalFuture<Object>, Void>() {
@Override public Void apply(IgniteInternalFuture<Object> f) {
try {
@@ -2603,7 +2611,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
expiryPlc0,
skipVals,
needVer,
- /*keepCacheObject*/true
+ /*keepCacheObject*/true,
+ mvccReadVersion(cacheCtx)
).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
@Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
try {