You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/12 11:24:18 UTC

[04/10] ignite git commit: ignite-5932

ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af887544
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af887544
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af887544

Branch: refs/heads/ignite-5932
Commit: af887544099dd3e2f427b74a029f601ffddb4471
Parents: 1780062
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 12:30:02 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 12:30:02 2017 +0300

----------------------------------------------------------------------
 .../dht/GridPartitionedGetFuture.java           | 15 ++++++++--
 .../dht/GridPartitionedSingleGetFuture.java     | 21 ++++++++++----
 .../dht/atomic/GridDhtAtomicCache.java          |  6 ++--
 .../dht/colocated/GridDhtColocatedCache.java    | 30 +++++++++++++-------
 .../cache/distributed/near/GridNearTxLocal.java | 23 ++++++++++-----
 5 files changed, 68 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 7993d05..7689a4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -78,6 +78,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     private static IgniteLogger log;
 
     /** */
+    protected final MvccCoordinatorVersion mvccVer;
+
+    /** */
     private MvccQueryTracker mvccTracker;
 
     /**
@@ -94,6 +97,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
      * @param skipVals Skip values flag.
      * @param needVer If {@code true} returns values as tuples containing value and version.
      * @param keepCacheObjects Keep cache objects flag.
+     * @param mvccVer Mvcc version.
      */
     public GridPartitionedGetFuture(
         GridCacheContext<K, V> cctx,
@@ -107,7 +111,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
         boolean needVer,
-        boolean keepCacheObjects
+        boolean keepCacheObjects,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         super(cctx,
             keys,
@@ -121,6 +126,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             needVer,
             keepCacheObjects,
             recovery);
+        assert mvccVer == null || cctx.mvccEnabled();
+
+        this.mvccVer = mvccVer;
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
@@ -133,6 +141,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         if (!cctx.mvccEnabled())
             return null;
 
+        if (mvccVer != null)
+            return mvccVer;
+
         MvccCoordinatorVersion ver = mvccTracker.mvccVersion();
 
         assert ver != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + "]";
@@ -158,7 +169,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
         }
 
-        if (cctx.mvccEnabled()) {
+        if (cctx.mvccEnabled() && mvccVer == null) {
             mvccTracker = new MvccQueryTracker(cctx, canRemap, this);
 
             trackable = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index b34687f..afef744 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,11 +41,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -122,6 +123,9 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     @GridToStringInclude
     private ClusterNode node;
 
+    /** */
+    protected final MvccCoordinatorVersion mvccVer;
+
     /**
      * @param cctx Context.
      * @param key Key.
@@ -149,9 +153,11 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
         boolean skipVals,
         boolean needVer,
         boolean keepCacheObjects,
-        boolean recovery
+        boolean recovery,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         assert key != null;
+        assert mvccVer == null || cctx.mvccEnabled();
 
         AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
 
@@ -176,6 +182,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
         this.keepCacheObjects = keepCacheObjects;
         this.recovery = recovery;
         this.topVer = topVer;
+        this.mvccVer = mvccVer;
 
         futId = IgniteUuid.randomUuid();
 
@@ -275,6 +282,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 cctx.mvcc().addFuture(this, futId);
             }
 
+            // TODO IGNITE-3478.
             GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),
                 futId.localId(),
                 key,
@@ -355,7 +363,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 boolean skipEntry = readNoEntry;
 
                 if (readNoEntry) {
-                    CacheDataRow row = cctx.offheap().read(cctx, key); // TODO IGNITE-3478
+                    CacheDataRow row = mvccVer != null ? cctx.offheap().mvccRead(cctx, key, mvccVer) :
+                        cctx.offheap().read(cctx, key);
 
                     if (row != null) {
                         long expireTime = row.expireTime();
@@ -398,8 +407,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                                 taskName,
                                 expiryPlc,
                                 true,
-                                null,
-                                null); // TODO IGNITE-3478
+                                mvccVer,
+                                null);
 
                             if (res != null) {
                                 v = res.value();
@@ -418,7 +427,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                                 taskName,
                                 expiryPlc,
                                 true,
-                                null); // TODO IGNITE-3478
+                                mvccVer);
                         }
 
                         colocated.context().evicts().touch(entry, topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 16416cc..d6862fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1385,7 +1385,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             skipVals,
             needVer,
             false,
-            recovery);
+            recovery,
+            null);
 
         fut.init();
 
@@ -1591,7 +1592,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             expiry,
             skipVals,
             needVer,
-            false);
+            false,
+            null);
 
         fut.init(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7364cb3..c975edb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
@@ -241,7 +242,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             skipVals,
             needVer,
             /*keepCacheObjects*/false,
-            opCtx != null && opCtx.recovery());
+            opCtx != null && opCtx.recovery(),
+            null);
 
         fut.init();
 
@@ -319,7 +321,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param needVer Need version.
      * @return Loaded values.
      */
-    public IgniteInternalFuture<Map<K, V>> loadAsync(
+    private IgniteInternalFuture<Map<K, V>> loadAsync(
         @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
         boolean forcePrimary,
@@ -341,7 +343,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             expiryPlc,
             skipVals,
             needVer,
-            false);
+            false,
+            null);
     }
 
     /**
@@ -370,7 +373,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         boolean skipVals,
         boolean needVer,
         boolean keepCacheObj,
-        boolean recovery
+        boolean recovery,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
             ctx.toCacheKeyObject(key),
@@ -384,7 +388,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             skipVals,
             needVer,
             keepCacheObj,
-            recovery);
+            recovery,
+            mvccVer);
 
         fut.init();
 
@@ -403,6 +408,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param skipVals Skip values flag.
      * @param needVer If {@code true} returns values as tuples containing value and version.
      * @param keepCacheObj Keep cache objects flag.
+     * @param mvccVer Mvcc version.
      * @return Load future.
      */
     public final IgniteInternalFuture<Map<K, V>> loadAsync(
@@ -417,8 +423,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
         boolean needVer,
-        boolean keepCacheObj
+        boolean keepCacheObj,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
+        assert mvccVer == null || ctx.mvccEnabled();
+
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
 
@@ -426,7 +435,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             expiryPlc = expiryPolicy(null);
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
-        if (!forcePrimary && ctx.affinityNode() && !ctx.mvccEnabled()) {
+        if (!forcePrimary && ctx.affinityNode() && (!ctx.mvccEnabled() || mvccVer != null)) {
             try {
                 Map<K, V> locVals = null;
 
@@ -499,7 +508,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                             taskName,
                                             expiryPlc,
                                             !deserializeBinary,
-                                            null,
+                                            mvccVer,
                                             null);
 
                                         if (getRes != null) {
@@ -519,7 +528,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                             taskName,
                                             expiryPlc,
                                             !deserializeBinary,
-                                            null);
+                                            mvccVer);
                                     }
 
                                     // Entry was not in memory or in swap, so we remove it from cache.
@@ -602,7 +611,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             expiryPlc,
             skipVals,
             needVer,
-            keepCacheObj);
+            keepCacheObj,
+            mvccVer);
 
         fut.init(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c8dfc9f..08f20de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1659,6 +1659,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         }
     }
 
+    private MvccCoordinatorVersion mvccReadVersion(GridCacheContext cctx) {
+        if (!cctx.mvccEnabled() || mvccTracker == null)
+            return null;
+
+        return mvccTracker.mvccVersion();
+    }
+
     /**
      * @param cacheCtx Cache context.
      * @param keys Keys to get.
@@ -1830,8 +1837,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                             resolveTaskName(),
                                             null,
                                             txEntry.keepBinary(),
-                                            null,
-                                            null); // TODO IGNITE-3478
+                                            null, // TODO IGNITE-3478
+                                            null);
 
                                         if (getRes != null) {
                                             val = getRes.value();
@@ -2214,8 +2221,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                         resolveTaskName(),
                                         accessPlc,
                                         !deserializeBinary,
-                                        null,
-                                        null) : null; // TODO IGNITE-3478
+                                        mvccReadVersion(cacheCtx), // TODO IGNITE-3478
+                                        null) : null;
 
                                 if (getRes != null) {
                                     val = getRes.value();
@@ -2234,7 +2241,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                     resolveTaskName(),
                                     accessPlc,
                                     !deserializeBinary,
-                                    null); // TODO IGNITE-3478
+                                    mvccReadVersion(cacheCtx)); // TODO IGNITE-3478
                             }
 
                             if (val != null) {
@@ -2572,7 +2579,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                     skipVals,
                     needVer,
                     /*keepCacheObject*/true,
-                    recovery
+                    recovery,
+                    mvccReadVersion(cacheCtx)
                 ).chain(new C1<IgniteInternalFuture<Object>, Void>() {
                     @Override public Void apply(IgniteInternalFuture<Object> f) {
                         try {
@@ -2603,7 +2611,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                     expiryPlc0,
                     skipVals,
                     needVer,
-                    /*keepCacheObject*/true
+                    /*keepCacheObject*/true,
+                    mvccReadVersion(cacheCtx)
                 ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
                     @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
                         try {