You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/03 10:47:08 UTC
[5/8] ignite git commit: IGNITE-2532: Single update request is
finalyl wired up. Though, it is not optimzied yet.
IGNITE-2532: Single update request is finalyl wired up. Though, it is not optimzied yet.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a1a31d2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a1a31d2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a1a31d2
Branch: refs/heads/ignite-2523
Commit: 2a1a31d2d0999fdb8854a05fd7a75a4cd0b159a4
Parents: 29c2aee
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 11:36:39 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 11:36:39 2016 +0300
----------------------------------------------------------------------
.../GridNearAbstractAtomicUpdateFuture.java | 4 +
.../dht/atomic/GridNearAtomicUpdateFuture.java | 142 +++++++++++++------
.../dht/atomic/GridNearAtomicUpdateRequest.java | 3 +-
.../GridNearAtomicUpdateRequestInterface.java | 4 +
4 files changed, 107 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
index 60e0c5f..f8c6810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -53,6 +54,9 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
*/
public abstract class GridNearAbstractAtomicUpdateFuture extends GridFutureAdapter<Object>
implements GridCacheAtomicFuture<Object> {
+ /** */
+ public static final IgniteProductVersion SINGLE_PUT_MSG_SINCE = IgniteProductVersion.fromString("1.6.0");
+
/** Logger reference. */
protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 2aa510d..493c765 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -108,7 +108,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
private Collection<KeyCacheObject> remapKeys;
/** Not null is operation is mapped to single node. */
- private GridNearAtomicUpdateRequest singleReq;
+ private GridNearAtomicUpdateRequestInterface singleReq;
/** Operation result. */
private GridCacheReturn opRes;
@@ -195,7 +195,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
GridNearAtomicUpdateResponse res = null;
synchronized (this) {
- GridNearAtomicUpdateRequest req;
+ GridNearAtomicUpdateRequestInterface req;
if (singleReq != null)
req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
@@ -339,11 +339,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
* @param nodeId Node ID.
* @param req Request.
*/
- private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequestInterface req) {
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ new CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res);
}
});
@@ -353,7 +353,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ if (req instanceof GridNearAtomicUpdateRequest)
+ cctx.io().send(req.nodeId(), (GridNearAtomicUpdateRequest)req, cctx.ioPolicy());
+ else {
+ assert req instanceof GridNearAtomicSingleUpdateRequest;
+
+ cctx.io().send(req.nodeId(), (GridNearAtomicSingleUpdateRequest)req, cctx.ioPolicy());
+ }
if (syncMode == FULL_ASYNC)
onDone(new GridCacheReturn(cctx, true, true, null, true));
@@ -372,7 +378,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
- GridNearAtomicUpdateRequest locUpdate = null;
+ GridNearAtomicUpdateRequestInterface locUpdate = null;
// Send messages to remote nodes first, then run local update.
for (GridNearAtomicUpdateRequest req : mappings.values()) {
@@ -415,7 +421,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
*/
@SuppressWarnings("unchecked")
void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequest req;
+ GridNearAtomicUpdateRequestInterface req;
AffinityTopologyVersion remapTopVer = null;
@@ -545,7 +551,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
if (rcvAll && nearEnabled) {
if (mappings != null) {
- for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ for (GridNearAtomicUpdateRequestInterface req0 : mappings.values()) {
GridNearAtomicUpdateResponse res0 = req0.response();
assert res0 != null : req0;
@@ -619,7 +625,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
* @param req Request.
* @param e Error.
*/
- void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+ void onSendError(GridNearAtomicUpdateRequestInterface req, IgniteCheckedException e) {
synchronized (this) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
req.nodeId(),
@@ -647,7 +653,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
Exception err = null;
- GridNearAtomicUpdateRequest singleReq0 = null;
+ GridNearAtomicUpdateRequestInterface singleReq0 = null;
Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
int size = keys.size();
@@ -674,7 +680,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
if (size == 1 && !fastMap) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ singleReq0 = mapSingleUpdate(topVer, topNodes, futVer, updVer);
}
else {
Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
@@ -799,6 +805,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
* @return Mapping.
* @throws Exception If failed.
*/
+ @SuppressWarnings("ConstantConditions")
private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
GridCacheVersion futVer,
@@ -926,14 +933,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
/**
* @param topVer Topology version.
+ * @param topNodes Topology nodes.
* @param futVer Future version.
* @param updVer Update version.
* @return Request.
* @throws Exception If failed.
*/
- private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer) throws Exception {
+ private GridNearAtomicUpdateRequestInterface mapSingleUpdate(AffinityTopologyVersion topVer,
+ Collection<ClusterNode> topNodes, GridCacheVersion futVer, @Nullable GridCacheVersion updVer) throws Exception {
Object key = F.first(keys);
Object val;
@@ -990,36 +997,81 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid).");
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- 1);
-
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- true);
-
- return req;
+ // Decide whether we will use optimzied version of update request.
+ boolean optimize = true;
+
+ for (ClusterNode topNode : topNodes) {
+ if (topNode.version().compareTo(SINGLE_PUT_MSG_SINCE) < 0) {
+ optimize = false;
+
+ break;
+ }
+ }
+
+ if (optimize) {
+ GridNearAtomicSingleUpdateRequest req = new GridNearAtomicSingleUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
+ }
+ else {
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 340dbf6..674a5be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -373,6 +373,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage
* @param conflictVer Conflict version (optional).
* @param primary If given key is primary on this mapping.
*/
+ @SuppressWarnings("unchecked")
public void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
@@ -384,7 +385,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage
if (op == TRANSFORM) {
assert val instanceof EntryProcessor : val;
- entryProcessor = (EntryProcessor<Object, Object, Object>) val;
+ entryProcessor = (EntryProcessor<Object, Object, Object>)val;
}
assert val != null || op == DELETE;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
index 9f17756..2ef4bae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
@@ -98,4 +98,8 @@ public interface GridNearAtomicUpdateRequestInterface {
public EntryProcessor<Object, Object, Object> entryProcessor(int idx);
public CacheObject writeValue(int idx);
+
+ @Nullable public GridNearAtomicUpdateResponse response();
+
+ public boolean onResponse(GridNearAtomicUpdateResponse res);
}