You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/11 09:27:13 UTC
[04/50] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC
updates
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e5c69b83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e5c69b83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e5c69b83
Branch: refs/heads/ignite-426
Commit: e5c69b831a8f564440bd0960cc2a865cd907525a
Parents: 424ab07
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 14:19:24 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 14:19:24 2015 -0700
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 9 ++++--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 33 +++++++++++++++-----
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 7 ++++-
3 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7a8cc06..02e48df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1178,6 +1178,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
e.printStackTrace();
}
finally {
+ if (dhtFut != null && !remap)
+ dhtFut.map();
+
if (locked != null)
unlockEntries(locked, req.topologyVersion());
@@ -1221,8 +1224,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else {
// If there are backups, map backup update future.
if (dhtFut != null)
- dhtFut.map();
- // Otherwise, complete the call.
+ dhtFut.onMapped();
+ // Otherwise, complete the call.
else
completionCb.apply(req, res);
}
@@ -2523,7 +2526,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
catch (ClusterTopologyCheckedException ignored) {
U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
- req.nodeId());
+ nodeId);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3a68263..15ec121 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -90,6 +90,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** */
private boolean waitForExchange;
+ /** */
+ private boolean mapped;
+
/**
* @param cctx Cache context.
* @param completionCb Callback to invoke when future is completed.
@@ -349,37 +352,51 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
GridAtomicMappingKey mappingKey = e.getKey();
GridDhtAtomicUpdateRequest req = e.getValue();
+ UUID nodeId = mappingKey.nodeId();
+ int part = mappingKey.partition();
+
+ assert !nodeId.equals(cctx.localNodeId());
+
try {
if (log.isDebugEnabled())
- log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+ log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
- if (mappingKey.partition() >= 0) {
- Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), false);
+ if (part >= 0) {
+ Object topic = CU.partitionMessageTopic(cctx, part, false);
- cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
+ cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(),
2 * cctx.gridConfig().getNetworkTimeout());
}
else {
- assert mappingKey.partition() == -1;
+ assert part == -1;
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ cctx.io().send(nodeId, req, cctx.ioPolicy());
}
}
catch (ClusterTopologyCheckedException ignored) {
U.warn(log, "Failed to send update request to backup node because it left grid: " +
- req.nodeId());
+ nodeId);
mappings.remove(mappingKey);
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
- + req.nodeId(), ex);
+ + nodeId, ex);
mappings.remove(mappingKey);
}
}
}
+ mapped = true;
+ }
+
+ /**
+ * On mapped callback.
+ */
+ public void onMapped() {
+ assert mapped;
+
checkComplete();
// Send response right away if no ACKs from backup is required.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 35c6910..7149dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -44,7 +44,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
- /** Node ID. */
+ /**
+ * Node ID.
+ *
+ * @deprecated Not used anymore, but removal will break compatibility.
+ */
+ @Deprecated
private UUID nodeId;
/** Future version. */