You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/17 14:15:39 UTC

[10/10] ignite git commit: ignite-4705

ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2029d5a1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2029d5a1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2029d5a1

Branch: refs/heads/ignite-4705
Commit: 2029d5a1ece985eec7c0140f7dd35421ba130e70
Parents: 035a7bb
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 17 17:15:24 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 17 17:15:24 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAtomicFuture.java |  1 -
 .../processors/cache/GridCacheReturn.java       |  2 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      | 97 +++++++++++---------
 .../dht/atomic/GridDhtAtomicCache.java          | 16 +---
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  2 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  2 +-
 6 files changed, 62 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 565f11e..8df229e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
  * Update future for atomic cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index 02c882c..80f43fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -131,7 +131,7 @@ public class GridCacheReturn implements Externalizable, Message {
      * @return {@code True} if value is not {@code null}.
      */
     public boolean hasValue() {
-        return v != null;
+        return invokeRes || v != null || cacheObj != null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 1c83163..da6616b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -49,7 +49,9 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  * DHT atomic cache backup update future.
@@ -311,7 +313,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /** {@inheritDoc} */
     @Override public final boolean onNodeLeft(UUID nodeId) {
-        boolean res = registerResponse(nodeId);
+        boolean res = registerResponse(nodeId, true);
 
         if (res && msgLog.isDebugEnabled()) {
             msgLog.debug("DTH update fut, node left [futId=" + futId + ", writeVer=" + writeVer +
@@ -323,9 +325,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /**
      * @param nodeId Node ID.
+     * @param nodeErr Node error flag.
      * @return {@code True} if request found.
      */
-    final boolean registerResponse(UUID nodeId) {
+    final boolean registerResponse(UUID nodeId, boolean nodeErr) {
         int resCnt0;
 
         GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
@@ -359,54 +362,69 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      */
     final void map(GridCacheReturn ret) {
         boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
+        boolean primaryReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC || ret.hasValue();
+
+        List<UUID> dhtNodes = null;
+
+        if (fullSync) {
+            dhtNodes = new ArrayList<>(mappings.size());
+
+            dhtNodes.addAll(mappings.keySet());
+
+            if (primaryReplyToNear)
+                updateRes.mapping(dhtNodes);
+        }
 
         if (!F.isEmpty(mappings)) {
-            List<UUID> dhtNodes = null;
+            sendDhtRequests(fullSync && !primaryReplyToNear, dhtNodes, ret);
 
-            if (fullSync) {
-                dhtNodes = new ArrayList<>(mappings.size());
+            if (primaryReplyToNear)
+                completionCb.apply(updateReq, updateRes);
+        }
+        else {
+            // Reply.
+            completionCb.apply(updateReq, updateRes);
 
-                dhtNodes.addAll(mappings.keySet());
-            }
+            onDone();
+        }
+    }
 
-            for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
-                try {
+    /**
+     * @param nearReplyInfo {@code True} if need add inforamtion for near node response.
+     * @param dhtNodes DHT nodes.
+     * @param ret Return value.
+     */
+    private void sendDhtRequests(boolean nearReplyInfo, List<UUID> dhtNodes, GridCacheReturn ret) {
+        for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
+            try {
+                if (nearReplyInfo) {
                     req.dhtNodes(dhtNodes);
-                    req.setResult(ret.success());
-
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, sent request [futId=" + futId +
-                            ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-                    }
+                    if (!ret.hasValue())
+                        req.setResult(ret.success());
                 }
-                catch (ClusterTopologyCheckedException ignored) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futId +
-                            ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-                    }
 
-                    registerResponse(req.nodeId());
-                }
-                catch (IgniteCheckedException ignored) {
-                    U.error(msgLog, "Failed to send request [futId=" + futId +
-                        ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
-                    registerResponse(req.nodeId());
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DTH update fut, sent request [futId=" + futId +
+                        ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
                 }
             }
+            catch (ClusterTopologyCheckedException ignored) {
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futId +
+                        ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+                }
 
-            // Send response right away if no ACKs from backup is required.
-            // Backups will send ACKs anyway, future will be completed after all backups have replied.
-            if (!fullSync)
-                completionCb.apply(updateReq, updateRes);
-        }
-        else {
-            // Reply.
-            completionCb.apply(updateReq, updateRes);
+                registerResponse(req.nodeId(), true);
+            }
+            catch (IgniteCheckedException ignored) {
+                U.error(msgLog, "Failed to send request [futId=" + futId +
+                    ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
 
-            onDone();
+                registerResponse(req.nodeId(), true);
+            }
         }
     }
 
@@ -419,12 +437,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         if (log.isDebugEnabled())
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
 
-        registerResponse(nodeId);
+        registerResponse(nodeId, false);
     }
 
     /**
      * @param nodeId Node ID.
-     * @param nodeId Near node ID.
+     * @param nearNodeId Near node ID.
      * @param futId Future ID.
      * @param writeVer Update version.
      * @param syncMode Write synchronization mode.
@@ -474,9 +492,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                 for (CI1<Boolean> clsr : cntQryClsrs)
                     clsr.apply(suc);
             }
-//
-//            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
-//                completionCb.apply(updateReq, updateRes);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2f6e320..d402c86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -110,6 +110,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -210,18 +211,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
-//                if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
-//                    assert req.writeSynchronizationMode() != FULL_ASYNC : req;
-//
-//                    // Always send reply in CLOCK ordering mode.
-//                    sendNearUpdateReply(res.nodeId(), res);
-//
-//                    return;
-//                }
-//
-//                // Request should be for primary keys only in PRIMARY ordering mode.
-//                assert req.hasPrimary() : req;
-
                 if (req.writeSynchronizationMode() != FULL_ASYNC)
                     sendNearUpdateReply(res.nodeId(), res);
                 else {
@@ -1970,9 +1959,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
         else {
             // If there are backups, map backup update future.
-            if (dhtFut != null)
+            if (dhtFut != null) {
                 dhtFut.map(res.returnValue());
                 // Otherwise, complete the call.
+            }
             else
                 completionCb.apply(req, res);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 671034c..4ee90a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -159,7 +159,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
             }
         }
 
-        registerResponse(nodeId);
+        registerResponse(nodeId, false);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index ea6a1b6..20c3d4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -112,7 +112,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
             }
         }
 
-        registerResponse(nodeId);
+        registerResponse(nodeId, false);
     }
 
     /** {@inheritDoc} */