You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:51 UTC
[06/10] ignite git commit: ignite-4680 wip
ignite-4680 wip
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e7ee08a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e7ee08a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e7ee08a
Branch: refs/heads/ignite-4680-sb
Commit: 3e7ee08a486a7fcdd080c0807433abe22c3f171b
Parents: 855c66b
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Thu Mar 16 15:02:55 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Thu Mar 16 15:02:55 2017 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 32 ++++++++++++++++----
.../GridNearAtomicAbstractUpdateRequest.java | 8 +++++
.../atomic/GridNearAtomicFullUpdateRequest.java | 6 ++--
.../distributed/near/GridNearAtomicCache.java | 5 ++-
4 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e68d72d..973256f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1684,7 +1684,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final int stripeIdx,
final UpdateReplyClosure completionCb
) {
- IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
+ IgniteInternalFuture<Object> forceFut;
+
+ if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
+ && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
+ && req.stripeMap() != null) {
+ int[] stripeIdxs = req.stripeMap().get(stripeIdx);
+
+ List<KeyCacheObject> keys = new ArrayList<>(stripeIdxs.length);
+
+ for (int i = 0; i < stripeIdxs.length; i++)
+ keys.add(req.key(stripeIdxs[i]));
+
+ forceFut = preldr.request(keys, req.topologyVersion());
+ }
+ else
+ forceFut = preldr.request(req, req.topologyVersion());
if (forceFut == null || forceFut.isDone()) {
try {
@@ -1782,8 +1797,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
&& req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
- && ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
- stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);
+ && req.stripeMap() != null) {
+ stripeIdxs = req.stripeMap().get(stripeIdx);
res.stripe(stripeIdx);
}
@@ -1958,6 +1973,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (remap) {
assert dhtFut == null;
+ res.stripe(-1);
completionCb.apply(req, res);
}
@@ -2888,8 +2904,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer,
int[] stripeIdxs)
throws GridDhtInvalidPartitionException {
- if (req.size() == 1) {
- KeyCacheObject key = req.key(0);
+
+ int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+ if (keysNum == 1) {
+ int idx = stripeIdxs != null ? stripeIdxs[0] : 0;
+
+ KeyCacheObject key = req.key(idx);
while (true) {
GridDhtCacheEntry entry = entryExx(key, topVer);
@@ -2903,7 +2924,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
else {
- int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
List<GridDhtCacheEntry> locked = new ArrayList<>(keysNum);
while (true) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 0748434..c8e904d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
@@ -443,6 +444,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
*/
public abstract KeyCacheObject key(int idx);
+ /**
+ * @return Stripe map.
+ */
+ @Nullable public Map<Integer, int[]> stripeMap() {
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 10;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 5a8c66b..2e619ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -404,10 +404,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
return expiryPlc;
}
- /**
- * @return Stripe mapping.
- */
- @Nullable public Map<Integer, int[]> stripeMap() {
+ /** {@inheritDoc} */
+ @Override @Nullable public Map<Integer, int[]> stripeMap() {
return stripeMap;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 5844021..299386c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -134,8 +133,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
int keyNum;
int[] stripeIdxs;
- if (res.stripe() > -1 && req instanceof GridNearAtomicFullUpdateRequest) {
- stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(res.stripe());
+ if (res.stripe() > -1 && req.stripeMap() != null) {
+ stripeIdxs = req.stripeMap().get(res.stripe());
keyNum = stripeIdxs.length;
}
else {