You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:53 UTC
[08/10] ignite git commit: tmp
tmp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3200c2e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3200c2e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3200c2e4
Branch: refs/heads/ignite-4680-sb
Commit: 3200c2e43a9b2cee1d35a19f749de50f19f5d0d4
Parents: 5f51839
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 16:59:26 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 17:46:01 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 36 +--
.../dht/atomic/GridDhtAtomicCache.java | 265 +++++++++----------
.../dht/atomic/NearAtomicResponseHelper.java | 29 +-
3 files changed, 151 insertions(+), 179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6dad30b..17ae595 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -856,23 +856,25 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// }
- if (plc == GridIoPolicy.SYSTEM_POOL &&
- (msg.partition() != Integer.MIN_VALUE ||
- msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
- Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
- ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
-
- if (stripemap != null) {
- GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
-
- msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
-
- for (Integer stripe : stripemap.keySet()) {
- stripedExecutor.execute(stripe, c);
- }
- }
- else
- stripedExecutor.execute(msg.partition(), c);
+ if (plc == GridIoPolicy.SYSTEM_POOL && (msg.partition() != Integer.MIN_VALUE)) {
+// Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
+// ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
+//
+// if (stripemap != null) {
+// GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
+//
+// msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
+//
+// for (Integer stripe : stripemap.keySet()) {
+// stripedExecutor.execute(stripe, c);
+// }
+// }
+// else
+
+// if (msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)
+// stripedExecutor.execute(((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap().keySet().iterator().next(), c);
+// else
+ stripedExecutor.execute(msg.partition(), c);
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index dcc79d0..bcfea79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -213,14 +213,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
if (req.writeSynchronizationMode() != FULL_ASYNC) {
- if (req.responseHelper() != null) {
- GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
-
- if (res0 != null)
- sendNearUpdateReply(res.nodeId(), res0);
- }
- else
- sendNearUpdateReply(res.nodeId(), res);
+ sendNearUpdateReply(res.nodeId(), res);
}
else {
if (res.remapTopologyVersion() != null)
@@ -1684,22 +1677,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final int stripeIdx,
final UpdateReplyClosure completionCb
) {
- IgniteInternalFuture<Object> forceFut;
-
- if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
- && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
- && req.stripeMap() != null) {
- int[] stripeIdxs = req.stripeMap().get(stripeIdx);
+// if (true) {
+// updateAllAsyncInternal0(nodeId, req, ((IgniteThread)Thread.currentThread()).stripe(), completionCb);
+//
+// return;
+// }
- List<KeyCacheObject> keys = new ArrayList<>(stripeIdxs.length);
-
- for (int i = 0; i < stripeIdxs.length; i++)
- keys.add(req.key(stripeIdxs[i]));
-
- forceFut = preldr.request(keys, req.topologyVersion());
- }
- else
- forceFut = preldr.request(req, req.topologyVersion());
+ IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
if (forceFut == null || forceFut.isDone()) {
try {
@@ -1715,9 +1699,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
+ updateAllAsyncInternal0(nodeId, req, ((IgniteThread)Thread.currentThread()).stripe(), completionCb);
}
else {
+ if (true)
+ throw new RuntimeException("error");
+
forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@Override public void apply(IgniteInternalFuture<Object> fut) {
try {
@@ -1761,9 +1748,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
}
- private GridCacheVersion ver;
-
-
/**
* Executes local update after preloader fetched values.
*
@@ -1774,11 +1758,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void updateAllAsyncInternal0(
UUID nodeId,
- GridNearAtomicAbstractUpdateRequest req,
+ final GridNearAtomicAbstractUpdateRequest req,
int stripeIdx,
- UpdateReplyClosure completionCb
+ final UpdateReplyClosure completionCb
) {
- ClusterNode node = ctx.discovery().node(nodeId);
+ final ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
U.warn(msgLog, "Skip near update request, node originated update request left [" +
@@ -1787,25 +1771,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
- nodeId,
- req.futureId(),
- req.partition(),
- false,
- ctx.deploymentEnabled());
-
- res.partition(req.partition());
-
- int[] stripeIdxs = null;
-
- if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
- && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
- && req.stripeMap() != null) {
- stripeIdxs = req.stripeMap().get(stripeIdx);
-
- res.stripe(stripeIdx);
- }
-
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -1830,103 +1795,41 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
if (top.stopping()) {
- addAllKeysAsFailed(req, res, stripeIdxs, new IgniteCheckedException("Failed to perform cache operation " +
- "(cache is stopped): " + name()));
-
- completionCb.apply(req, res);
return;
}
// Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if (true || req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
- locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
-
- boolean hasNear = ctx.discovery().cacheNearNode(node, name());
-
- // Assign next version for update inside entries lock.
- if (ver == null)
- ver = ctx.versions().next(top.topologyVersion());
-
- if (hasNear)
- res.nearVersion(ver);
-
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Assigned update version [futId=" + req.futureId() +
- ", writeVer=" + ver + ']');
- }
-
- assert ver != null : "Got null version for update request: " + req;
-
- boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
-
- int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
-
- dhtFut = null;//createDhtFuture(ver, req, size);
-
- expiry = expiryPolicy(req.expiry());
+ if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ Map<Integer, int[]> stripemap = req.stripeMap();
- GridCacheReturn retVal = null;
+ final GridDhtAtomicAbstractUpdateFuture fut = createDhtFuture(null, req, req.size());
- if (size > 1 && // Several keys ...
- writeThrough() && !req.skipStore() && // and store is enabled ...
- !ctx.store().isLocal() && // and this is not local store ...
- // (conflict resolver should be used for local store)
- !ctx.dr().receiveEnabled() // and no DR.
- ) {
- // This method can only be used when there are no replicated entries in the batch.
- UpdateBatchResult updRes = updateWithBatch(node,
- hasNear,
- req,
- res,
- locked,
- ver,
- dhtFut,
- ctx.isDrEnabled(),
- taskName,
- expiry,
- sndPrevVal,
- stripeIdxs);
-
- deleted = updRes.deleted();
- dhtFut = updRes.dhtFuture();
-
- if (req.operation() == TRANSFORM)
- retVal = updRes.invokeResults();
- }
- else {
- UpdateSingleResult updRes = updateSingle(node,
- hasNear,
- req,
- res,
- locked,
- ver,
- dhtFut,
- ctx.isDrEnabled(),
- taskName,
- expiry,
- sndPrevVal,
- stripeIdxs);
+ ((GridNearAtomicFullUpdateRequest)req).responseHelper(new NearAtomicResponseHelper(stripemap.size()));
- retVal = updRes.returnValue();
- deleted = updRes.deleted();
- dhtFut = updRes.dhtFuture();
+ for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) {
+ if (stripeIdx == e.getKey())
+ update(fut, node, req, e.getValue(), completionCb);
+ else {
+ ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() {
+ @Override public void run() {
+ try {
+ update(fut, node, req, e.getValue(), completionCb);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
}
-
- if (retVal == null)
- retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
-
- res.returnValue(retVal);
-
- if (dhtFut != null)
- ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
}
else {
// Should remap all keys.
remap = true;
- res.remapTopologyVersion(top.topologyVersion());
+ //res.remapTopologyVersion(top.topologyVersion());
}
}
finally {
@@ -1958,16 +1861,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
remap = true;
- res.remapTopologyVersion(ctx.topology().topologyVersion());
+ //res.remapTopologyVersion(ctx.topology().topologyVersion());
}
catch (Throwable e) {
// At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is
// an attempt to use cleaned resources.
U.error(log, "Unexpected exception during cache update", e);
- addAllKeysAsFailed(req, res, stripeIdxs, e);
+ //addAllKeysAsFailed(req, res, stripeIdxs, e);
- completionCb.apply(req, res);
+ //completionCb.apply(req, res);
if (e instanceof Error)
throw e;
@@ -1975,23 +1878,91 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- if (remap) {
- assert dhtFut == null;
- res.stripe(-1);
+// if (remap) {
+// assert dhtFut == null;
+// res.stripe(-1);
+//
+// completionCb.apply(req, res);
+// }
+// else {
+// if (dhtFut != null)
+// dhtFut.map(node, res.returnValue(), res, completionCb);
+// else
+// completionCb.apply(req, res);
+// }
+//
+// if (req.writeSynchronizationMode() != FULL_ASYNC)
+// req.cleanup(!node.isLocal());
+//
+// sendTtlUpdateRequest(expiry);
+ }
+
+ private void update(
+ GridDhtAtomicAbstractUpdateFuture fut,
+ ClusterNode node,
+ GridNearAtomicAbstractUpdateRequest req,
+ int[] stripeIdxs,
+ UpdateReplyClosure completionCb) throws GridCacheEntryRemovedException {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+ node.id(),
+ req.futureId(),
+ req.partition(),
+ false,
+ ctx.deploymentEnabled());
- completionCb.apply(req, res);
- }
- else {
- if (dhtFut != null)
- dhtFut.map(node, res.returnValue(), res, completionCb);
- else
- completionCb.apply(req, res);
+ List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
+
+ boolean hasNear = ctx.discovery().cacheNearNode(node, name());
+
+ // Assign next version for update inside entries lock.
+ //if (ver == null)
+ GridCacheVersion ver = ctx.versions().next(ctx.topology().topologyVersion());
+
+ if (hasNear)
+ res.nearVersion(ver);
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Assigned update version [futId=" + req.futureId() +
+ ", writeVer=" + ver + ']');
}
- if (req.writeSynchronizationMode() != FULL_ASYNC)
- req.cleanup(!node.isLocal());
+ assert ver != null : "Got null version for update request: " + req;
+
+ boolean sndPrevVal = false;//!top.rebalanceFinished(req.topologyVersion());
+
+ int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+ GridCacheReturn retVal = null;
+
+ UpdateSingleResult updRes = updateSingle(node,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ null,
+ ctx.isDrEnabled(),
+ null,
+ null,
+ sndPrevVal,
+ stripeIdxs);
+
+ retVal = updRes.returnValue();
+
+ if (retVal == null)
+ retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
- sendTtlUpdateRequest(expiry);
+ res.returnValue(retVal);
+
+ unlockEntries(locked, null);
+
+ GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
+
+ if (res0 != null) {
+ fut.onDone();
+
+ completionCb.apply(req, res);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
index 00c9f6c..55c450c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
@@ -17,8 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
*
@@ -28,14 +27,16 @@ public class NearAtomicResponseHelper {
/** */
private GridNearAtomicUpdateResponse res;
+ private static final AtomicIntegerFieldUpdater<NearAtomicResponseHelper> UPD =
+ AtomicIntegerFieldUpdater.newUpdater(NearAtomicResponseHelper.class, "cnt");
+
/** */
- private Set<Integer> stripes;
+ private volatile int cnt;
/**
- * @param stripes Stripes collection.
*/
- public NearAtomicResponseHelper(Set<Integer> stripes) {
- this.stripes = new HashSet<>(stripes);
+ public NearAtomicResponseHelper(int cnt) {
+ this.cnt = cnt;
}
/**
@@ -43,18 +44,16 @@ public class NearAtomicResponseHelper {
* @return {@code true} if all responses added.
*/
public GridNearAtomicUpdateResponse addResponse(GridNearAtomicUpdateResponse res) {
- synchronized (this) {
- if (res.stripe() == -1)
- return res;
+ this.res = res;
- if (stripes.remove(res.stripe())) {
- mergeResponse(res);
+ int c = UPD.decrementAndGet(this);
- return stripes.isEmpty() ? this.res : null;
- }
+ //mergeResponse(res);
- return null;
- }
+ if (c == 0)
+ return this.res;
+
+ return null;
}
/**