You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/03 10:47:08 UTC

[5/8] ignite git commit: IGNITE-2532: Single update request is finalyl wired up. Though, it is not optimzied yet.

IGNITE-2532: Single update request is finalyl wired up. Though, it is not optimzied yet.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a1a31d2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a1a31d2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a1a31d2

Branch: refs/heads/ignite-2523
Commit: 2a1a31d2d0999fdb8854a05fd7a75a4cd0b159a4
Parents: 29c2aee
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 11:36:39 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 11:36:39 2016 +0300

----------------------------------------------------------------------
 .../GridNearAbstractAtomicUpdateFuture.java     |   4 +
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 142 +++++++++++++------
 .../dht/atomic/GridNearAtomicUpdateRequest.java |   3 +-
 .../GridNearAtomicUpdateRequestInterface.java   |   4 +
 4 files changed, 107 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
index 60e0c5f..f8c6810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -53,6 +54,9 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
  */
 public abstract class GridNearAbstractAtomicUpdateFuture extends GridFutureAdapter<Object>
     implements GridCacheAtomicFuture<Object> {
+    /** */
+    public static final IgniteProductVersion SINGLE_PUT_MSG_SINCE = IgniteProductVersion.fromString("1.6.0");
+
     /** Logger reference. */
     protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 2aa510d..493c765 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -108,7 +108,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
     private Collection<KeyCacheObject> remapKeys;
 
     /** Not null is operation is mapped to single node. */
-    private GridNearAtomicUpdateRequest singleReq;
+    private GridNearAtomicUpdateRequestInterface singleReq;
 
     /** Operation result. */
     private GridCacheReturn opRes;    
@@ -195,7 +195,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
         GridNearAtomicUpdateResponse res = null;
 
         synchronized (this) {
-            GridNearAtomicUpdateRequest req;
+            GridNearAtomicUpdateRequestInterface req;
 
             if (singleReq != null)
                 req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
@@ -339,11 +339,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
      * @param nodeId Node ID.
      * @param req Request.
      */
-    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequestInterface req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                new CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
                         onResult(res.nodeId(), res);
                     }
                 });
@@ -353,7 +353,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
                 if (log.isDebugEnabled())
                     log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                if (req instanceof GridNearAtomicUpdateRequest)
+                    cctx.io().send(req.nodeId(), (GridNearAtomicUpdateRequest)req, cctx.ioPolicy());
+                else {
+                    assert req instanceof GridNearAtomicSingleUpdateRequest;
+
+                    cctx.io().send(req.nodeId(), (GridNearAtomicSingleUpdateRequest)req, cctx.ioPolicy());
+                }
 
                 if (syncMode == FULL_ASYNC)
                     onDone(new GridCacheReturn(cctx, true, true, null, true));
@@ -372,7 +378,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
     private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
         UUID locNodeId = cctx.localNodeId();
 
-        GridNearAtomicUpdateRequest locUpdate = null;
+        GridNearAtomicUpdateRequestInterface locUpdate = null;
 
         // Send messages to remote nodes first, then run local update.
         for (GridNearAtomicUpdateRequest req : mappings.values()) {
@@ -415,7 +421,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
      */
     @SuppressWarnings("unchecked")
     void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
-        GridNearAtomicUpdateRequest req;
+        GridNearAtomicUpdateRequestInterface req;
 
         AffinityTopologyVersion remapTopVer = null;
 
@@ -545,7 +551,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
 
         if (rcvAll && nearEnabled) {
             if (mappings != null) {
-                for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+                for (GridNearAtomicUpdateRequestInterface req0 : mappings.values()) {
                     GridNearAtomicUpdateResponse res0 = req0.response();
 
                     assert res0 != null : req0;
@@ -619,7 +625,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
      * @param req Request.
      * @param e Error.
      */
-    void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+    void onSendError(GridNearAtomicUpdateRequestInterface req, IgniteCheckedException e) {
         synchronized (this) {
             GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                 req.nodeId(),
@@ -647,7 +653,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
         }
 
         Exception err = null;
-        GridNearAtomicUpdateRequest singleReq0 = null;
+        GridNearAtomicUpdateRequestInterface singleReq0 = null;
         Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
 
         int size = keys.size();
@@ -674,7 +680,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
             if (size == 1 && !fastMap) {
                 assert remapKeys == null || remapKeys.size() == 1;
 
-                singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+                singleReq0 = mapSingleUpdate(topVer, topNodes, futVer, updVer);
             }
             else {
                 Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
@@ -799,6 +805,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
      * @return Mapping.
      * @throws Exception If failed.
      */
+    @SuppressWarnings("ConstantConditions")
     private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
         GridCacheVersion futVer,
@@ -926,14 +933,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
 
     /**
      * @param topVer Topology version.
+     * @param topNodes Topology nodes.
      * @param futVer Future version.
      * @param updVer Update version.
      * @return Request.
      * @throws Exception If failed.
      */
-    private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
-        @Nullable GridCacheVersion updVer) throws Exception {
+    private GridNearAtomicUpdateRequestInterface mapSingleUpdate(AffinityTopologyVersion topVer,
+        Collection<ClusterNode> topNodes, GridCacheVersion futVer, @Nullable GridCacheVersion updVer) throws Exception {
         Object key = F.first(keys);
 
         Object val;
@@ -990,36 +997,81 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
             throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                 "left the grid).");
 
-        GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
-            cctx.cacheId(),
-            primary.id(),
-            futVer,
-            fastMap,
-            updVer,
-            topVer,
-            topLocked,
-            syncMode,
-            op,
-            retval,
-            expiryPlc,
-            invokeArgs,
-            filter,
-            subjId,
-            taskNameHash,
-            skipStore,
-            keepBinary,
-            cctx.kernalContext().clientNode(),
-            cctx.deploymentEnabled(),
-            1);
-
-        req.addUpdateEntry(cacheKey,
-            val,
-            conflictTtl,
-            conflictExpireTime,
-            conflictVer,
-            true);
-
-        return req;
+        // Decide whether we will use optimzied version of update request.
+        boolean optimize = true;
+
+        for (ClusterNode topNode : topNodes) {
+            if (topNode.version().compareTo(SINGLE_PUT_MSG_SINCE) < 0) {
+                optimize = false;
+
+                break;
+            }
+        }
+
+        if (optimize) {
+            GridNearAtomicSingleUpdateRequest req = new GridNearAtomicSingleUpdateRequest(
+                cctx.cacheId(),
+                primary.id(),
+                futVer,
+                fastMap,
+                updVer,
+                topVer,
+                topLocked,
+                syncMode,
+                op,
+                retval,
+                expiryPlc,
+                invokeArgs,
+                filter,
+                subjId,
+                taskNameHash,
+                skipStore,
+                keepBinary,
+                cctx.kernalContext().clientNode(),
+                cctx.deploymentEnabled(),
+                1);
+
+            req.addUpdateEntry(cacheKey,
+                val,
+                conflictTtl,
+                conflictExpireTime,
+                conflictVer,
+                true);
+
+            return req;
+        }
+        else {
+            GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+                cctx.cacheId(),
+                primary.id(),
+                futVer,
+                fastMap,
+                updVer,
+                topVer,
+                topLocked,
+                syncMode,
+                op,
+                retval,
+                expiryPlc,
+                invokeArgs,
+                filter,
+                subjId,
+                taskNameHash,
+                skipStore,
+                keepBinary,
+                cctx.kernalContext().clientNode(),
+                cctx.deploymentEnabled(),
+                1);
+
+            req.addUpdateEntry(cacheKey,
+                val,
+                conflictTtl,
+                conflictExpireTime,
+                conflictVer,
+                true);
+
+            return req;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 340dbf6..674a5be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -373,6 +373,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage
      * @param conflictVer Conflict version (optional).
      * @param primary If given key is primary on this mapping.
      */
+    @SuppressWarnings("unchecked")
     public void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
@@ -384,7 +385,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage
         if (op == TRANSFORM) {
             assert val instanceof EntryProcessor : val;
 
-            entryProcessor = (EntryProcessor<Object, Object, Object>) val;
+            entryProcessor = (EntryProcessor<Object, Object, Object>)val;
         }
 
         assert val != null || op == DELETE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
index 9f17756..2ef4bae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
@@ -98,4 +98,8 @@ public interface GridNearAtomicUpdateRequestInterface {
     public EntryProcessor<Object, Object, Object> entryProcessor(int idx);
 
     public CacheObject writeValue(int idx);
+
+    @Nullable public GridNearAtomicUpdateResponse response();
+
+    public boolean onResponse(GridNearAtomicUpdateResponse res);
 }