You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:51 UTC

[06/10] ignite git commit: ignite-4680 wip

ignite-4680 wip


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e7ee08a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e7ee08a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e7ee08a

Branch: refs/heads/ignite-4680-sb
Commit: 3e7ee08a486a7fcdd080c0807433abe22c3f171b
Parents: 855c66b
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Thu Mar 16 15:02:55 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Thu Mar 16 15:02:55 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 32 ++++++++++++++++----
 .../GridNearAtomicAbstractUpdateRequest.java    |  8 +++++
 .../atomic/GridNearAtomicFullUpdateRequest.java |  6 ++--
 .../distributed/near/GridNearAtomicCache.java   |  5 ++-
 4 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e68d72d..973256f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1684,7 +1684,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final int stripeIdx,
         final UpdateReplyClosure completionCb
     ) {
-        IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
+        IgniteInternalFuture<Object> forceFut;
+
+        if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
+            && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
+            && req.stripeMap() != null) {
+            int[] stripeIdxs = req.stripeMap().get(stripeIdx);
+
+            List<KeyCacheObject> keys = new ArrayList<>(stripeIdxs.length);
+
+            for (int i = 0; i < stripeIdxs.length; i++)
+                keys.add(req.key(stripeIdxs[i]));
+
+            forceFut = preldr.request(keys, req.topologyVersion());
+        }
+        else
+            forceFut = preldr.request(req, req.topologyVersion());
 
         if (forceFut == null || forceFut.isDone()) {
             try {
@@ -1782,8 +1797,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
             && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
-            && ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
-            stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);
+            && req.stripeMap() != null) {
+            stripeIdxs = req.stripeMap().get(stripeIdx);
 
             res.stripe(stripeIdx);
         }
@@ -1958,6 +1973,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         if (remap) {
             assert dhtFut == null;
+            res.stripe(-1);
 
             completionCb.apply(req, res);
         }
@@ -2888,8 +2904,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         AffinityTopologyVersion topVer,
         int[] stripeIdxs)
         throws GridDhtInvalidPartitionException {
-        if (req.size() == 1) {
-            KeyCacheObject key = req.key(0);
+
+        int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+        if (keysNum == 1) {
+            int idx = stripeIdxs != null ? stripeIdxs[0] : 0;
+
+            KeyCacheObject key = req.key(idx);
 
             while (true) {
                 GridDhtCacheEntry entry = entryExx(key, topVer);
@@ -2903,7 +2924,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         else {
-            int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
             List<GridDhtCacheEntry> locked = new ArrayList<>(keysNum);
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 0748434..c8e904d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
@@ -443,6 +444,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      */
     public abstract KeyCacheObject key(int idx);
 
+    /**
+     * @return Stripe map.
+     */
+    @Nullable public Map<Integer, int[]> stripeMap() {
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
         return 10;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 5a8c66b..2e619ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -404,10 +404,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         return expiryPlc;
     }
 
-    /**
-     * @return Stripe mapping.
-     */
-    @Nullable public Map<Integer, int[]> stripeMap() {
+    /** {@inheritDoc} */
+    @Override @Nullable public Map<Integer, int[]> stripeMap() {
         return stripeMap;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 5844021..299386c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -134,8 +133,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         int keyNum;
         int[] stripeIdxs;
 
-        if (res.stripe() > -1 && req instanceof GridNearAtomicFullUpdateRequest) {
-            stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(res.stripe());
+        if (res.stripe() > -1 && req.stripeMap() != null) {
+            stripeIdxs = req.stripeMap().get(res.stripe());
             keyNum = stripeIdxs.length;
         }
         else {