You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/11 09:27:29 UTC

[20/50] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e6b90cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e6b90cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e6b90cf

Branch: refs/heads/ignite-426
Commit: 8e6b90cf835fe67778b961dccec0682f448f8e57
Parents: b27af71
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Mon Aug 3 17:57:25 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Mon Aug 3 17:57:25 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          |  6 ++
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 78 +++++---------------
 2 files changed, 25 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e6b90cf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4f1b887..a8dc8ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -134,6 +134,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                if (res.nodeId().equals(locNodeId)) {
+                    processNearAtomicUpdateResponse(res.nodeId(), res);
+
+                    return;
+                }
+
                 if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
                     // Always send reply in CLOCK ordering mode.
                     sendNearUpdateReply(res.nodeId(), res);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e6b90cf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ff24964..5150113 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -1047,30 +1047,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         singleNodeId = mappingKey.nodeId();
         singleReq = req;
 
-        if (cctx.localNodeId().equals(mappingKey.nodeId())) {
-            cache.updateAllAsyncInternal(mappingKey.nodeId(), req,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req,
-                        GridNearAtomicUpdateResponse res) {
-                        assert res.futureVersion().equals(futVer) : futVer;
-
-                        onResult(res.nodeId(), res);
-                    }
-                });
-        }
-        else {
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                sendRequest(mappingKey, req);
+            sendRequest(mappingKey, req);
 
-                if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
-                    onDone(new GridCacheReturn(cctx, true, null, true));
-            }
-            catch (IgniteCheckedException e) {
-                onDone(addFailedKeys(req.keys(), e));
-            }
+            if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
+                onDone(new GridCacheReturn(cctx, true, null, true));
+        }
+        catch (IgniteCheckedException e) {
+            onDone(addFailedKeys(req.keys(), e));
         }
     }
 
@@ -1080,57 +1067,30 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param mappings Mappings to send.
      */
     private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) {
-        UUID locNodeId = cctx.localNodeId();
-
-        Collection<GridNearAtomicUpdateRequest> locUpdates = null;
-
-        // Send messages to remote nodes first, then run local update.
         for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
             GridAtomicMappingKey mappingKey = e.getKey();
             GridNearAtomicUpdateRequest req = e.getValue();
 
-            if (locNodeId.equals(req.nodeId())) {
-                if (locUpdates == null)
-                    locUpdates = new ArrayList<>(mappings.size());
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                locUpdates.add(req);
+                sendRequest(mappingKey, req);
             }
-            else {
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
-
-                    sendRequest(mappingKey, req);
-                }
-                catch (IgniteCheckedException ex) {
-                    addFailedKeys(req.keys(), ex);
-
-                    removeMapping(mappingKey);
-                }
+            catch (IgniteCheckedException ex) {
+                addFailedKeys(req.keys(), ex);
 
-                if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
-                    removeMapping(mappingKey);
+                removeMapping(mappingKey);
             }
+
+            if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
+                removeMapping(mappingKey);
         }
 
         if (syncMode == FULL_ASYNC)
             // In FULL_ASYNC mode always return (null, true).
             opRes = new GridCacheReturn(cctx, true, null, true);
 
-        if (locUpdates != null) {
-            for (GridNearAtomicUpdateRequest locUpdate : locUpdates) {
-                cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                    new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                        @Override public void apply(GridNearAtomicUpdateRequest req,
-                            GridNearAtomicUpdateResponse res) {
-                            assert res.futureVersion().equals(futVer) : futVer;
-
-                            onResult(res.nodeId(), res);
-                        }
-                    });
-            }
-        }
-
         checkComplete();
     }