You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/11 09:27:29 UTC
[20/50] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC
updates
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e6b90cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e6b90cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e6b90cf
Branch: refs/heads/ignite-426
Commit: 8e6b90cf835fe67778b961dccec0682f448f8e57
Parents: b27af71
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Mon Aug 3 17:57:25 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Mon Aug 3 17:57:25 2015 -0700
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 6 ++
.../dht/atomic/GridNearAtomicUpdateFuture.java | 78 +++++---------------
2 files changed, 25 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e6b90cf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4f1b887..a8dc8ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -134,6 +134,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ if (res.nodeId().equals(locNodeId)) {
+ processNearAtomicUpdateResponse(res.nodeId(), res);
+
+ return;
+ }
+
if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
// Always send reply in CLOCK ordering mode.
sendNearUpdateReply(res.nodeId(), res);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e6b90cf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ff24964..5150113 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -1047,30 +1047,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
singleNodeId = mappingKey.nodeId();
singleReq = req;
- if (cctx.localNodeId().equals(mappingKey.nodeId())) {
- cache.updateAllAsyncInternal(mappingKey.nodeId(), req,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req,
- GridNearAtomicUpdateResponse res) {
- assert res.futureVersion().equals(futVer) : futVer;
-
- onResult(res.nodeId(), res);
- }
- });
- }
- else {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- sendRequest(mappingKey, req);
+ sendRequest(mappingKey, req);
- if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
- onDone(new GridCacheReturn(cctx, true, null, true));
- }
- catch (IgniteCheckedException e) {
- onDone(addFailedKeys(req.keys(), e));
- }
+ if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
+ onDone(new GridCacheReturn(cctx, true, null, true));
+ }
+ catch (IgniteCheckedException e) {
+ onDone(addFailedKeys(req.keys(), e));
}
}
@@ -1080,57 +1067,30 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param mappings Mappings to send.
*/
private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) {
- UUID locNodeId = cctx.localNodeId();
-
- Collection<GridNearAtomicUpdateRequest> locUpdates = null;
-
- // Send messages to remote nodes first, then run local update.
for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
GridAtomicMappingKey mappingKey = e.getKey();
GridNearAtomicUpdateRequest req = e.getValue();
- if (locNodeId.equals(req.nodeId())) {
- if (locUpdates == null)
- locUpdates = new ArrayList<>(mappings.size());
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- locUpdates.add(req);
+ sendRequest(mappingKey, req);
}
- else {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
-
- sendRequest(mappingKey, req);
- }
- catch (IgniteCheckedException ex) {
- addFailedKeys(req.keys(), ex);
-
- removeMapping(mappingKey);
- }
+ catch (IgniteCheckedException ex) {
+ addFailedKeys(req.keys(), ex);
- if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
- removeMapping(mappingKey);
+ removeMapping(mappingKey);
}
+
+ if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
+ removeMapping(mappingKey);
}
if (syncMode == FULL_ASYNC)
// In FULL_ASYNC mode always return (null, true).
opRes = new GridCacheReturn(cctx, true, null, true);
- if (locUpdates != null) {
- for (GridNearAtomicUpdateRequest locUpdate : locUpdates) {
- cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req,
- GridNearAtomicUpdateResponse res) {
- assert res.futureVersion().equals(futVer) : futVer;
-
- onResult(res.nodeId(), res);
- }
- });
- }
- }
-
checkComplete();
}