You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/17 14:15:39 UTC
[10/10] ignite git commit: ignite-4705
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2029d5a1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2029d5a1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2029d5a1
Branch: refs/heads/ignite-4705
Commit: 2029d5a1ece985eec7c0140f7dd35421ba130e70
Parents: 035a7bb
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 17 17:15:24 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 17 17:15:24 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAtomicFuture.java | 1 -
.../processors/cache/GridCacheReturn.java | 2 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 97 +++++++++++---------
.../dht/atomic/GridDhtAtomicCache.java | 16 +---
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
6 files changed, 62 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 565f11e..8df229e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
* Update future for atomic cache.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index 02c882c..80f43fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -131,7 +131,7 @@ public class GridCacheReturn implements Externalizable, Message {
* @return {@code True} if value is not {@code null}.
*/
public boolean hasValue() {
- return v != null;
+ return invokeRes || v != null || cacheObj != null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 1c83163..da6616b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -49,7 +49,9 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
/**
* DHT atomic cache backup update future.
@@ -311,7 +313,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** {@inheritDoc} */
@Override public final boolean onNodeLeft(UUID nodeId) {
- boolean res = registerResponse(nodeId);
+ boolean res = registerResponse(nodeId, true);
if (res && msgLog.isDebugEnabled()) {
msgLog.debug("DTH update fut, node left [futId=" + futId + ", writeVer=" + writeVer +
@@ -323,9 +325,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/**
* @param nodeId Node ID.
+ * @param nodeErr Node error flag.
* @return {@code True} if request found.
*/
- final boolean registerResponse(UUID nodeId) {
+ final boolean registerResponse(UUID nodeId, boolean nodeErr) {
int resCnt0;
GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
@@ -359,54 +362,69 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
*/
final void map(GridCacheReturn ret) {
boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
+ boolean primaryReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC || ret.hasValue();
+
+ List<UUID> dhtNodes = null;
+
+ if (fullSync) {
+ dhtNodes = new ArrayList<>(mappings.size());
+
+ dhtNodes.addAll(mappings.keySet());
+
+ if (primaryReplyToNear)
+ updateRes.mapping(dhtNodes);
+ }
if (!F.isEmpty(mappings)) {
- List<UUID> dhtNodes = null;
+ sendDhtRequests(fullSync && !primaryReplyToNear, dhtNodes, ret);
- if (fullSync) {
- dhtNodes = new ArrayList<>(mappings.size());
+ if (primaryReplyToNear)
+ completionCb.apply(updateReq, updateRes);
+ }
+ else {
+ // Reply.
+ completionCb.apply(updateReq, updateRes);
- dhtNodes.addAll(mappings.keySet());
- }
+ onDone();
+ }
+ }
- for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
- try {
+ /**
+ * @param nearReplyInfo {@code True} if need add inforamtion for near node response.
+ * @param dhtNodes DHT nodes.
+ * @param ret Return value.
+ */
+ private void sendDhtRequests(boolean nearReplyInfo, List<UUID> dhtNodes, GridCacheReturn ret) {
+ for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
+ try {
+ if (nearReplyInfo) {
req.dhtNodes(dhtNodes);
- req.setResult(ret.success());
-
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, sent request [futId=" + futId +
- ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
- }
+ if (!ret.hasValue())
+ req.setResult(ret.success());
}
- catch (ClusterTopologyCheckedException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futId +
- ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
- }
- registerResponse(req.nodeId());
- }
- catch (IgniteCheckedException ignored) {
- U.error(msgLog, "Failed to send request [futId=" + futId +
- ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
- registerResponse(req.nodeId());
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DTH update fut, sent request [futId=" + futId +
+ ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
}
}
+ catch (ClusterTopologyCheckedException ignored) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futId +
+ ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+ }
- // Send response right away if no ACKs from backup is required.
- // Backups will send ACKs anyway, future will be completed after all backups have replied.
- if (!fullSync)
- completionCb.apply(updateReq, updateRes);
- }
- else {
- // Reply.
- completionCb.apply(updateReq, updateRes);
+ registerResponse(req.nodeId(), true);
+ }
+ catch (IgniteCheckedException ignored) {
+ U.error(msgLog, "Failed to send request [futId=" + futId +
+ ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
- onDone();
+ registerResponse(req.nodeId(), true);
+ }
}
}
@@ -419,12 +437,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
if (log.isDebugEnabled())
log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
- registerResponse(nodeId);
+ registerResponse(nodeId, false);
}
/**
* @param nodeId Node ID.
- * @param nodeId Near node ID.
+ * @param nearNodeId Near node ID.
* @param futId Future ID.
* @param writeVer Update version.
* @param syncMode Write synchronization mode.
@@ -474,9 +492,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
for (CI1<Boolean> clsr : cntQryClsrs)
clsr.apply(suc);
}
-//
-// if (updateReq.writeSynchronizationMode() == FULL_SYNC)
-// completionCb.apply(updateReq, updateRes);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2f6e320..d402c86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -110,6 +110,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -210,18 +211,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
-// if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
-// assert req.writeSynchronizationMode() != FULL_ASYNC : req;
-//
-// // Always send reply in CLOCK ordering mode.
-// sendNearUpdateReply(res.nodeId(), res);
-//
-// return;
-// }
-//
-// // Request should be for primary keys only in PRIMARY ordering mode.
-// assert req.hasPrimary() : req;
-
if (req.writeSynchronizationMode() != FULL_ASYNC)
sendNearUpdateReply(res.nodeId(), res);
else {
@@ -1970,9 +1959,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
else {
// If there are backups, map backup update future.
- if (dhtFut != null)
+ if (dhtFut != null) {
dhtFut.map(res.returnValue());
// Otherwise, complete the call.
+ }
else
completionCb.apply(req, res);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 671034c..4ee90a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -159,7 +159,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
}
}
- registerResponse(nodeId);
+ registerResponse(nodeId, false);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2029d5a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index ea6a1b6..20c3d4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -112,7 +112,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
}
}
- registerResponse(nodeId);
+ registerResponse(nodeId, false);
}
/** {@inheritDoc} */