You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/11 09:27:13 UTC

[04/50] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e5c69b83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e5c69b83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e5c69b83

Branch: refs/heads/ignite-426
Commit: e5c69b831a8f564440bd0960cc2a865cd907525a
Parents: 424ab07
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 14:19:24 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 14:19:24 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          |  9 ++++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 33 +++++++++++++++-----
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  7 ++++-
 3 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7a8cc06..02e48df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1178,6 +1178,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 e.printStackTrace();
             }
             finally {
+                if (dhtFut != null && !remap)
+                    dhtFut.map();
+
                 if (locked != null)
                     unlockEntries(locked, req.topologyVersion());
 
@@ -1221,8 +1224,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         else {
             // If there are backups, map backup update future.
             if (dhtFut != null)
-                dhtFut.map();
-                // Otherwise, complete the call.
+                dhtFut.onMapped();
+            // Otherwise, complete the call.
             else
                 completionCb.apply(req, res);
         }
@@ -2523,7 +2526,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
         catch (ClusterTopologyCheckedException ignored) {
             U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
-                req.nodeId());
+                nodeId);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3a68263..15ec121 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -90,6 +90,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** */
     private boolean waitForExchange;
 
+    /** */
+    private boolean mapped;
+
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -349,37 +352,51 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                 GridAtomicMappingKey mappingKey = e.getKey();
                 GridDhtAtomicUpdateRequest req = e.getValue();
 
+                UUID nodeId = mappingKey.nodeId();
+                int part = mappingKey.partition();
+
+                assert !nodeId.equals(cctx.localNodeId());
+
                 try {
                     if (log.isDebugEnabled())
-                        log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+                        log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
 
-                    if (mappingKey.partition() >= 0) {
-                        Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), false);
+                    if (part >= 0) {
+                        Object topic = CU.partitionMessageTopic(cctx, part, false);
 
-                        cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
+                        cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(),
                             2 * cctx.gridConfig().getNetworkTimeout());
                     }
                     else {
-                        assert mappingKey.partition() == -1;
+                        assert part == -1;
 
-                        cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                        cctx.io().send(nodeId, req, cctx.ioPolicy());
                     }
                 }
                 catch (ClusterTopologyCheckedException ignored) {
                     U.warn(log, "Failed to send update request to backup node because it left grid: " +
-                        req.nodeId());
+                        nodeId);
 
                     mappings.remove(mappingKey);
                 }
                 catch (IgniteCheckedException ex) {
                     U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
-                        + req.nodeId(), ex);
+                        + nodeId, ex);
 
                     mappings.remove(mappingKey);
                 }
             }
         }
 
+        mapped = true;
+    }
+
+    /**
+     * On mapped callback.
+     */
+    public void onMapped() {
+        assert mapped;
+
         checkComplete();
 
         // Send response right away if no ACKs from backup is required.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 35c6910..7149dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -44,7 +44,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
-    /** Node ID. */
+    /**
+     * Node ID.
+     *
+     * @deprecated Not used anymore, but removal will break compatibility.
+     */
+    @Deprecated
     private UUID nodeId;
 
     /** Future version. */