You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/22 04:57:36 UTC

[3/3] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9dec3b7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9dec3b7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9dec3b7b

Branch: refs/heads/ignite-104
Commit: 9dec3b7b1738a23d69b9b28ff24ea781693b323f
Parents: 67572e5
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Tue Jul 21 19:57:26 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Tue Jul 21 19:57:26 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridAtomicMappingKey.java        |  86 +++++++++++++++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  84 ++++-----------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 106 +++++--------------
 3 files changed, 133 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9dec3b7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
new file mode 100644
index 0000000..52e3c7f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Mapping Key.
+ */
+class GridAtomicMappingKey {
+    /** Node ID. */
+    private final UUID nodeId;
+
+    /** Partition. */
+    private final int part;
+
+    /**
+     * @param nodeId Node ID.
+     * @param part Partition.
+     */
+    GridAtomicMappingKey(UUID nodeId, int part) {
+        assert nodeId != null;
+        assert part >= -1 : part;
+
+        this.nodeId = nodeId;
+        this.part = part;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Partition.
+     */
+    int partition() {
+        return part;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        GridAtomicMappingKey key = (GridAtomicMappingKey)o;
+
+        return nodeId.equals(key.nodeId) && part == key.part;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = nodeId.hashCode();
+
+        res = 31 * res + part;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridAtomicMappingKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9dec3b7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 7100d3d..23b2161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+    private ConcurrentMap<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
 
     /** Entries with readers. */
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
@@ -135,9 +135,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
-            @Override public ClusterNode apply(MappingKey mappingKey) {
-                return cctx.kernalContext().discovery().node(mappingKey.nodeId);
+        return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
+            @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
+                return cctx.kernalContext().discovery().node(mappingKey.nodeId());
             }
         }), F.notNull());
     }
@@ -147,15 +147,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         if (log.isDebugEnabled())
             log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
 
-        Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
+        Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
 
-        for (MappingKey mappingKey : mappings.keySet()) {
-            if (mappingKey.nodeId.equals(nodeId))
+        for (GridAtomicMappingKey mappingKey : mappings.keySet()) {
+            if (mappingKey.nodeId().equals(nodeId))
                 mappingKeys.add(mappingKey);
         }
 
         if (!mappingKeys.isEmpty()) {
-            for (MappingKey mappingKey : mappingKeys)
+            for (GridAtomicMappingKey mappingKey : mappingKeys)
                 mappings.remove(mappingKey);
 
             checkComplete();
@@ -227,7 +227,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         for (ClusterNode node : dhtNodes) {
             UUID nodeId = node.id();
 
-            MappingKey mappingKey = new MappingKey(nodeId, part);
+            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
 
             if (!nodeId.equals(cctx.localNodeId())) {
                 GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
@@ -282,7 +282,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1;
 
         for (UUID nodeId : readers) {
-            MappingKey mappingKey = new MappingKey(nodeId, part);
+            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
 
             GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
 
@@ -341,22 +341,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      */
     public void map() {
         if (!mappings.isEmpty()) {
-            for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
-                MappingKey mappingKey = e.getKey();
+            for (Map.Entry<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
+                GridAtomicMappingKey mappingKey = e.getKey();
                 GridDhtAtomicUpdateRequest req = e.getValue();
 
                 try {
                     if (log.isDebugEnabled())
                         log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                    if (mappingKey.part >= 0) {
-                        Object topic = CU.partitionMessageTopic(cctx, mappingKey.part, false);
+                    if (mappingKey.partition() >= 0) {
+                        Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), false);
 
-                        cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(),
+                        cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
                             2 * cctx.gridConfig().getNetworkTimeout());
                     }
                     else {
-                        assert mappingKey.part == -1;
+                        assert mappingKey.partition() == -1;
 
                         cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                     }
@@ -411,7 +411,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             }
         }
 
-        mappings.remove(new MappingKey(nodeId, updateRes.partition()));
+        mappings.remove(new GridAtomicMappingKey(nodeId, updateRes.partition()));
 
         checkComplete();
     }
@@ -427,7 +427,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
 
         for (Integer part : res.partitions())
-            mappings.remove(new MappingKey(nodeId, part));
+            mappings.remove(new GridAtomicMappingKey(nodeId, part));
 
         checkComplete();
     }
@@ -450,52 +450,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         return S.toString(GridDhtAtomicUpdateFuture.class, this);
     }
 
-    /**
-     */
-    private static class MappingKey {
-        /** Node ID. */
-        private final UUID nodeId;
-
-        /** Partition. */
-        private final int part;
-
-        /**
-         * @param nodeId Node ID.
-         * @param part Partition.
-         */
-        private MappingKey(UUID nodeId, int part) {
-            assert nodeId != null;
-            assert part >= -1 : part;
-
-            this.nodeId = nodeId;
-            this.part = part;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            MappingKey key = (MappingKey)o;
-
-            return nodeId.equals(key.nodeId) && part == key.part;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-
-            res = 31 * res + part;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MappingKey.class, this);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9dec3b7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ef3a18b..9b2a5e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<MappingKey, GridNearAtomicUpdateRequest> mappings;
+    private ConcurrentMap<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings;
 
     /** Error. */
     private volatile CachePartialUpdateCheckedException err;
@@ -246,9 +246,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
-            @Override public ClusterNode apply(MappingKey mappingKey) {
-                return cctx.kernalContext().discovery().node(mappingKey.nodeId);
+        return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
+            @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
+                return cctx.kernalContext().discovery().node(mappingKey.nodeId());
             }
         }), F.notNull());
     }
@@ -287,11 +287,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             return false;
         }
 
-        Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
+        Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
         Collection<KeyCacheObject> failedKeys = new ArrayList<>();
 
-        for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
-            if (e.getKey().nodeId.equals(nodeId)) {
+        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+            if (e.getKey().nodeId().equals(nodeId)) {
                 mappingKeys.add(e.getKey());
 
                 failedKeys.addAll(e.getValue().keys());
@@ -303,7 +303,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " +
                     "response is received: " + nodeId));
 
-            for (MappingKey key : mappingKeys)
+            for (GridAtomicMappingKey key : mappingKeys)
                 mappings.remove(key);
 
             checkComplete();
@@ -544,7 +544,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
         }
         else {
-            MappingKey mappingKey = new MappingKey(nodeId, res.partition());
+            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, res.partition());
 
             GridNearAtomicUpdateRequest req = mappings.get(mappingKey);
 
@@ -827,7 +827,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
 
             // Optimize mapping for single key.
-            mapSingle(new MappingKey(primary.id(), part), req);
+            mapSingle(new GridAtomicMappingKey(primary.id(), part), req);
 
             return;
         }
@@ -847,15 +847,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (conflictRmvVals != null)
             conflictRmvValsIt = conflictRmvVals.iterator();
 
-        Map<MappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
+        Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
 
         // Must do this in synchronized block because we need to atomically remove and add mapping.
         // Otherwise checkComplete() may see empty intermediate state.
         synchronized (this) {
             if (oldNodeId != null) {
                 // TODO: IGNITE-104 - Try to avoid iteration.
-                for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
-                    if (e.getKey().nodeId.equals(oldNodeId))
+                for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+                    if (e.getKey().nodeId().equals(oldNodeId))
                         mappings.remove(e.getKey());
                 }
             }
@@ -952,7 +952,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                     UUID nodeId = affNode.id();
 
-                    MappingKey mappingKey = new MappingKey(nodeId, part);
+                    GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
 
                     GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey);
 
@@ -997,7 +997,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         if ((single == null || single) && pendingMappings.size() == 1) {
-            Map.Entry<MappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
+            Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
 
             single = true;
 
@@ -1043,12 +1043,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param mappingKey Mapping key.
      * @param req Request.
      */
-    private void mapSingle(MappingKey mappingKey, GridNearAtomicUpdateRequest req) {
-        singleNodeId = mappingKey.nodeId;
+    private void mapSingle(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) {
+        singleNodeId = mappingKey.nodeId();
         singleReq = req;
 
-        if (cctx.localNodeId().equals(mappingKey.nodeId)) {
-            cache.updateAllAsyncInternal(mappingKey.nodeId, req,
+        if (cctx.localNodeId().equals(mappingKey.nodeId())) {
+            cache.updateAllAsyncInternal(mappingKey.nodeId(), req,
                 new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
                     @Override public void apply(GridNearAtomicUpdateRequest req,
                         GridNearAtomicUpdateResponse res) {
@@ -1079,14 +1079,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      *
      * @param mappings Mappings to send.
      */
-    private void doUpdate(Map<MappingKey, GridNearAtomicUpdateRequest> mappings) {
+    private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) {
         UUID locNodeId = cctx.localNodeId();
 
         Collection<GridNearAtomicUpdateRequest> locUpdates = null;
 
         // Send messages to remote nodes first, then run local update.
-        for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
-            MappingKey mappingKey = e.getKey();
+        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+            GridAtomicMappingKey mappingKey = e.getKey();
             GridNearAtomicUpdateRequest req = e.getValue();
 
             if (locNodeId.equals(req.nodeId())) {
@@ -1141,15 +1141,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param req Update request.
      * @throws IgniteCheckedException In case of error.
      */
-    private void sendRequest(MappingKey mappingKey, GridNearAtomicUpdateRequest req) throws IgniteCheckedException {
-        if (mappingKey.part >= 0) {
-            Object topic = CU.partitionMessageTopic(cctx, mappingKey.part, true);
+    private void sendRequest(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req)
+        throws IgniteCheckedException {
+        if (mappingKey.partition() >= 0) {
+            Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), true);
 
-            cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(),
+            cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
                 2 * cctx.gridConfig().getNetworkTimeout());
         }
         else {
-            assert mappingKey.part == -1;
+            assert mappingKey.partition() == -1;
 
             cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
         }
@@ -1160,7 +1161,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      *
      * @param mappingKey Mapping key.
      */
-    private void removeMapping(MappingKey mappingKey) {
+    private void removeMapping(GridAtomicMappingKey mappingKey) {
         mappings.remove(mappingKey);
     }
 
@@ -1205,53 +1206,4 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     public String toString() {
         return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
     }
-
-    /**
-     */
-    private static class MappingKey {
-        /** Node ID. */
-        private final UUID nodeId;
-
-        /** Partition. */
-        private final int part;
-
-        /**
-         * @param nodeId Node ID.
-         * @param part Partition.
-         */
-        private MappingKey(UUID nodeId, int part) {
-            assert nodeId != null;
-            assert part >= -1 : part;
-
-            this.nodeId = nodeId;
-            this.part = part;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            MappingKey key = (MappingKey)o;
-
-            return nodeId.equals(key.nodeId) && part == key.part;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-
-            res = 31 * res + part;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MappingKey.class, this);
-        }
-    }
 }