You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/03 10:47:06 UTC

[3/8] ignite git commit: IGNITE-2532: WIP. Only refactorings for now.

http://git-wip-us.apache.org/repos/asf/ignite/blob/29c2aee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 8b1673f..2aa510d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -62,8 +62,8 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
 /**
  * DHT atomic cache near update future.
  */
-public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFuture
-    {
+@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFuture {
     /** Keys */
     private Collection<?> keys;
 
@@ -79,8 +79,39 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<GridCacheVersion> conflictRmvVals;
 
-    /** State. */
-    private final UpdateState state;
+    /** Current topology version. */
+    private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+    /** */
+    private GridCacheVersion updVer;
+
+    /** Topology version when got mapping error. */
+    private AffinityTopologyVersion mapErrTopVer;
+
+    /** Mappings if operations is mapped to more than one node. */
+    @GridToStringInclude
+    private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+
+    /** */
+    private int resCnt;
+
+    /** Error. */
+    private CachePartialUpdateCheckedException err;
+
+    /** Future ID. */
+    private GridCacheVersion futVer;
+
+    /** Completion future for a particular topology version. */
+    private GridFutureAdapter<Void> topCompleteFut;
+
+    /** Keys to remap. */
+    private Collection<KeyCacheObject> remapKeys;
+
+    /** Not null is operation is mapped to single node. */
+    private GridNearAtomicUpdateRequest singleReq;
+
+    /** Operation result. */
+    private GridCacheReturn opRes;    
 
     /**
      * @param cctx Cache context.
@@ -130,55 +161,22 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
         assert vals == null || vals.size() == keys.size();
         assert conflictPutVals == null || conflictPutVals.size() == keys.size();
         assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
-        assert subjId != null;
 
         this.keys = keys;
         this.vals = vals;
         this.conflictPutVals = conflictPutVals;
         this.conflictRmvVals = conflictRmvVals;
-
-        state = new UpdateState();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return state.futureVersion();
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<?> keys() {
-        return keys;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        state.onNodeLeft(nodeId);
-
-        return false;
-    }
-
-    /**
-     * Performs future mapping.
-     */
-    public void map() {
-        AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
-
-        if (topVer == null)
-            mapOnTopology();
-        else {
-            topLocked = true;
-
-            // Cannot remap.
-            remapCnt = 1;
-
-            state.map(topVer, null);
-        }
+    @Override public synchronized GridCacheVersion version() {
+        return futVer;
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
         if (waitForPartitionExchange()) {
-            GridFutureAdapter<Void> fut = state.completeFuture(topVer);
+            GridFutureAdapter<Void> fut = completeFuture0(topVer);
 
             if (fut != null && isDone()) {
                 fut.onDone();
@@ -193,6 +191,39 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
     }
 
     /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        GridNearAtomicUpdateResponse res = null;
+
+        synchronized (this) {
+            GridNearAtomicUpdateRequest req;
+
+            if (singleReq != null)
+                req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+            else
+                req = mappings != null ? mappings.get(nodeId) : null;
+
+            if (req != null && req.response() == null) {
+                res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                    nodeId,
+                    req.futureVersion(),
+                    cctx.deploymentEnabled());
+
+                ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+                    "before response is received: " + nodeId);
+
+                e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+                res.addFailedKeys(req.keys(), e);
+            }
+        }
+
+        if (res != null)
+            onResult(nodeId, res, true);
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("ConstantConditions")
     @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
         assert res == null || res instanceof GridCacheReturn;
@@ -207,7 +238,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
             retval = Collections.emptyMap();
 
         if (super.onDone(retval, err)) {
-            GridCacheVersion futVer = state.onFutureDone();
+            GridCacheVersion futVer = onFutureDone();
 
             if (futVer != null)
                 cctx.mvcc().removeAtomicFuture(futVer);
@@ -219,13 +250,31 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
     }
 
     /**
+     * Performs future mapping.
+     */
+    public void map() {
+        AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
+
+        if (topVer == null)
+            mapOnTopology();
+        else {
+            topLocked = true;
+
+            // Cannot remap.
+            remapCnt = 1;
+
+            map(topVer, null);
+        }
+    }
+
+    /**
      * Response callback.
      *
      * @param nodeId Node ID.
      * @param res Update response.
      */
     public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
-        state.onResult(nodeId, res, false);
+        onResult(nodeId, res, false);
     }
 
     /**
@@ -281,7 +330,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
             cache.topology().readUnlock();
         }
 
-        state.map(topVer, null);
+        map(topVer, null);
     }
 
     /**
@@ -310,7 +359,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
                     onDone(new GridCacheReturn(cctx, true, true, null, true));
             }
             catch (IgniteCheckedException e) {
-                state.onSendError(req, e);
+                onSendError(req, e);
             }
         }
     }
@@ -341,7 +390,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {
-                    state.onSendError(req, e);
+                    onSendError(req, e);
                 }
             }
         }
@@ -360,610 +409,422 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
     }
 
     /**
-     *
+     * @param nodeId Node ID.
+     * @param res Response.
+     * @param nodeErr {@code True} if response was created on node failure.
      */
-    private class UpdateState {
-        /** Current topology version. */
-        private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
-        /** */
-        private GridCacheVersion updVer;
-
-        /** Topology version when got mapping error. */
-        private AffinityTopologyVersion mapErrTopVer;
-
-        /** Mappings if operations is mapped to more than one node. */
-        @GridToStringInclude
-        private Map<UUID, GridNearAtomicUpdateRequest> mappings;
-
-        /** */
-        private int resCnt;
-
-        /** Error. */
-        private CachePartialUpdateCheckedException err;
+    @SuppressWarnings("unchecked")
+    void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+        GridNearAtomicUpdateRequest req;
 
-        /** Future ID. */
-        private GridCacheVersion futVer;
+        AffinityTopologyVersion remapTopVer = null;
 
-        /** Completion future for a particular topology version. */
-        private GridFutureAdapter<Void> topCompleteFut;
+        GridCacheReturn opRes0 = null;
+        CachePartialUpdateCheckedException err0 = null;
 
-        /** Keys to remap. */
-        private Collection<KeyCacheObject> remapKeys;
+        boolean rcvAll;
 
-        /** Not null is operation is mapped to single node. */
-        private GridNearAtomicUpdateRequest singleReq;
+        GridFutureAdapter<?> fut0 = null;
 
-        /** Operation result. */
-        private GridCacheReturn opRes;
-
-        /**
-         * @return Future version.
-         */
-        @Nullable synchronized GridCacheVersion futureVersion() {
-            return futVer;
-        }
-
-        /**
-         * @param nodeId Left node ID.
-         */
-        void onNodeLeft(UUID nodeId) {
-            GridNearAtomicUpdateResponse res = null;
-
-            synchronized (this) {
-                GridNearAtomicUpdateRequest req;
-
-                if (singleReq != null)
-                    req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
-                else
-                    req = mappings != null ? mappings.get(nodeId) : null;
+        synchronized (this) {
+            if (!res.futureVersion().equals(futVer))
+                return;
 
-                if (req != null && req.response() == null) {
-                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                        nodeId,
-                        req.futureVersion(),
-                        cctx.deploymentEnabled());
+            if (singleReq != null) {
+                if (!singleReq.nodeId().equals(nodeId))
+                    return;
 
-                    ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
-                        "before response is received: " + nodeId);
+                req = singleReq;
 
-                    e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+                singleReq = null;
 
-                    res.addFailedKeys(req.keys(), e);
-                }
+                rcvAll = true;
             }
+            else {
+                req = mappings != null ? mappings.get(nodeId) : null;
 
-            if (res != null)
-                onResult(nodeId, res, true);
-        }
-
-        /**
-         * @param nodeId Node ID.
-         * @param res Response.
-         * @param nodeErr {@code True} if response was created on node failure.
-         */
-        @SuppressWarnings("unchecked")
-        void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
-            GridNearAtomicUpdateRequest req;
-
-            AffinityTopologyVersion remapTopVer = null;
-
-            GridCacheReturn opRes0 = null;
-            CachePartialUpdateCheckedException err0 = null;
-
-            boolean rcvAll;
-
-            GridFutureAdapter<?> fut0 = null;
+                if (req != null && req.onResponse(res)) {
+                    resCnt++;
 
-            synchronized (this) {
-                if (!res.futureVersion().equals(futVer))
+                    rcvAll = mappings.size() == resCnt;
+                }
+                else
                     return;
+            }
 
-                if (singleReq != null) {
-                    if (!singleReq.nodeId().equals(nodeId))
-                        return;
+            assert req != null && req.topologyVersion().equals(topVer) : req;
 
-                    req = singleReq;
+            if (res.remapKeys() != null) {
+                assert !fastMap || cctx.kernalContext().clientNode();
 
-                    singleReq = null;
+                if (remapKeys == null)
+                    remapKeys = U.newHashSet(res.remapKeys().size());
 
-                    rcvAll = true;
-                }
-                else {
-                    req = mappings != null ? mappings.get(nodeId) : null;
+                remapKeys.addAll(res.remapKeys());
 
-                    if (req != null && req.onResponse(res)) {
-                        resCnt++;
+                if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+                    mapErrTopVer = req.topologyVersion();
+            }
+            else if (res.error() != null) {
+                if (res.failedKeys() != null)
+                    addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+            }
+            else {
+                if (!req.fastMap() || req.hasPrimary()) {
+                    GridCacheReturn ret = res.returnValue();
 
-                        rcvAll = mappings.size() == resCnt;
+                    if (op == TRANSFORM) {
+                        if (ret != null)
+                            addInvokeResults(ret);
                     }
                     else
-                        return;
+                        opRes = ret;
                 }
+            }
 
-                assert req != null && req.topologyVersion().equals(topVer) : req;
-
-                if (res.remapKeys() != null) {
-                    assert !fastMap || cctx.kernalContext().clientNode();
-
-                    if (remapKeys == null)
-                        remapKeys = U.newHashSet(res.remapKeys().size());
-
-                    remapKeys.addAll(res.remapKeys());
+            if (rcvAll) {
+                if (remapKeys != null) {
+                    assert mapErrTopVer != null;
 
-                    if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
-                        mapErrTopVer = req.topologyVersion();
-                }
-                else if (res.error() != null) {
-                    if (res.failedKeys() != null)
-                        addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+                    remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
                 }
                 else {
-                    if (!req.fastMap() || req.hasPrimary()) {
-                        GridCacheReturn ret = res.returnValue();
-
-                        if (op == TRANSFORM) {
-                            if (ret != null)
-                                addInvokeResults(ret);
-                        }
-                        else
-                            opRes = ret;
-                    }
-                }
-
-                if (rcvAll) {
-                    if (remapKeys != null) {
-                        assert mapErrTopVer != null;
+                    if (err != null &&
+                        X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                        X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                        storeFuture() &&
+                        --remapCnt > 0) {
+                        ClusterTopologyCheckedException topErr =
+                            X.cause(err, ClusterTopologyCheckedException.class);
 
-                        remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
-                    }
-                    else {
-                        if (err != null &&
-                            X.hasCause(err, CachePartialUpdateCheckedException.class) &&
-                            X.hasCause(err, ClusterTopologyCheckedException.class) &&
-                            storeFuture() &&
-                            --remapCnt > 0) {
-                            ClusterTopologyCheckedException topErr =
-                                X.cause(err, ClusterTopologyCheckedException.class);
+                        if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                            CachePartialUpdateCheckedException cause =
+                                X.cause(err, CachePartialUpdateCheckedException.class);
 
-                            if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
-                                CachePartialUpdateCheckedException cause =
-                                    X.cause(err, CachePartialUpdateCheckedException.class);
+                            assert cause != null && cause.topologyVersion() != null : err;
 
-                                assert cause != null && cause.topologyVersion() != null : err;
+                            remapTopVer =
+                                new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
 
-                                remapTopVer =
-                                    new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+                            err = null;
 
-                                err = null;
+                            Collection<Object> failedKeys = cause.failedKeys();
 
-                                Collection<Object> failedKeys = cause.failedKeys();
+                            remapKeys = new ArrayList<>(failedKeys.size());
 
-                                remapKeys = new ArrayList<>(failedKeys.size());
+                            for (Object key : failedKeys)
+                                remapKeys.add(cctx.toCacheKeyObject(key));
 
-                                for (Object key : failedKeys)
-                                    remapKeys.add(cctx.toCacheKeyObject(key));
-
-                                updVer = null;
-                            }
+                            updVer = null;
                         }
                     }
+                }
 
-                    if (remapTopVer == null) {
-                        err0 = err;
-                        opRes0 = opRes;
-                    }
-                    else {
-                        fut0 = topCompleteFut;
+                if (remapTopVer == null) {
+                    err0 = err;
+                    opRes0 = opRes;
+                }
+                else {
+                    fut0 = topCompleteFut;
 
-                        topCompleteFut = null;
+                    topCompleteFut = null;
 
-                        cctx.mvcc().removeAtomicFuture(futVer);
+                    cctx.mvcc().removeAtomicFuture(futVer);
 
-                        futVer = null;
-                        topVer = AffinityTopologyVersion.ZERO;
-                    }
+                    futVer = null;
+                    topVer = AffinityTopologyVersion.ZERO;
                 }
             }
+        }
 
-            if (res.error() != null && res.failedKeys() == null) {
-                onDone(res.error());
+        if (res.error() != null && res.failedKeys() == null) {
+            onDone(res.error());
 
-                return;
-            }
+            return;
+        }
 
-            if (rcvAll && nearEnabled) {
-                if (mappings != null) {
-                    for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
-                        GridNearAtomicUpdateResponse res0 = req0.response();
+        if (rcvAll && nearEnabled) {
+            if (mappings != null) {
+                for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+                    GridNearAtomicUpdateResponse res0 = req0.response();
 
-                        assert res0 != null : req0;
+                    assert res0 != null : req0;
 
-                        updateNear(req0, res0);
-                    }
+                    updateNear(req0, res0);
                 }
-                else if (!nodeErr)
-                    updateNear(req, res);
             }
+            else if (!nodeErr)
+                updateNear(req, res);
+        }
 
-            if (remapTopVer != null) {
-                if (fut0 != null)
-                    fut0.onDone();
+        if (remapTopVer != null) {
+            if (fut0 != null)
+                fut0.onDone();
 
-                if (!waitTopFut) {
-                    onDone(new GridCacheTryPutFailedException());
+            if (!waitTopFut) {
+                onDone(new GridCacheTryPutFailedException());
 
-                    return;
-                }
+                return;
+            }
 
-                if (topLocked) {
-                    assert !F.isEmpty(remapKeys) : remapKeys;
+            if (topLocked) {
+                assert !F.isEmpty(remapKeys) : remapKeys;
 
-                    CachePartialUpdateCheckedException e =
-                        new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+                CachePartialUpdateCheckedException e =
+                    new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
 
-                    ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
-                        "Failed to update keys, topology changed while execute atomic update inside transaction.");
+                ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                    "Failed to update keys, topology changed while execute atomic update inside transaction.");
 
-                    cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+                cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
 
-                    e.add(remapKeys, cause);
+                e.add(remapKeys, cause);
 
-                    onDone(e);
+                onDone(e);
 
-                    return;
-                }
+                return;
+            }
 
-                IgniteInternalFuture<AffinityTopologyVersion> fut =
-                    cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+            IgniteInternalFuture<AffinityTopologyVersion> fut =
+                cctx.shared().exchange().affinityReadyFuture(remapTopVer);
 
-                if (fut == null)
-                    fut = new GridFinishedFuture<>(remapTopVer);
+            if (fut == null)
+                fut = new GridFinishedFuture<>(remapTopVer);
 
-                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                            @Override public void run() {
-                                try {
-                                    AffinityTopologyVersion topVer = fut.get();
+            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                        @Override public void run() {
+                            try {
+                                AffinityTopologyVersion topVer = fut.get();
 
-                                    map(topVer, remapKeys);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    onDone(e);
-                                }
+                                map(topVer, remapKeys);
                             }
-                        });
-                    }
-                });
-
-                return;
-            }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
+                        }
+                    });
+                }
+            });
 
-            if (rcvAll)
-                onDone(opRes0, err0);
+            return;
         }
 
-        /**
-         * @param req Request.
-         * @param e Error.
-         */
-        void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
-            synchronized (this) {
-                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                    req.nodeId(),
-                    req.futureVersion(),
-                    cctx.deploymentEnabled());
+        if (rcvAll)
+            onDone(opRes0, err0);
+    }
 
-                res.addFailedKeys(req.keys(), e);
+    /**
+     * @param req Request.
+     * @param e Error.
+     */
+    void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+        synchronized (this) {
+            GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                req.nodeId(),
+                req.futureVersion(),
+                cctx.deploymentEnabled());
 
-                onResult(req.nodeId(), res, true);
-            }
+            res.addFailedKeys(req.keys(), e);
+
+            onResult(req.nodeId(), res, true);
         }
+    }
 
-        /**
-         * @param topVer Topology version.
-         * @param remapKeys Keys to remap.
-         */
-        void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
-            Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+    /**
+     * @param topVer Topology version.
+     * @param remapKeys Keys to remap.
+     */
+    void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+        Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
-            if (F.isEmpty(topNodes)) {
-                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
-                    "left the grid)."));
+        if (F.isEmpty(topNodes)) {
+            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                "left the grid)."));
 
-                return;
-            }
+            return;
+        }
 
-            Exception err = null;
-            GridNearAtomicUpdateRequest singleReq0 = null;
-            Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+        Exception err = null;
+        GridNearAtomicUpdateRequest singleReq0 = null;
+        Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
 
-            int size = keys.size();
+        int size = keys.size();
 
-            GridCacheVersion futVer = cctx.versions().next(topVer);
+        GridCacheVersion futVer = cctx.versions().next(topVer);
 
-            GridCacheVersion updVer;
+        GridCacheVersion updVer;
 
-            // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-            if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
-                updVer = this.updVer;
+        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+        if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+            updVer = this.updVer;
 
-                if (updVer == null) {
-                    updVer = cctx.versions().next(topVer);
+            if (updVer == null) {
+                updVer = cctx.versions().next(topVer);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Assigned fast-map version for update on near node: " + updVer);
-                }
+                if (log.isDebugEnabled())
+                    log.debug("Assigned fast-map version for update on near node: " + updVer);
             }
-            else
-                updVer = null;
+        }
+        else
+            updVer = null;
 
-            try {
-                if (size == 1 && !fastMap) {
-                    assert remapKeys == null || remapKeys.size() == 1;
+        try {
+            if (size == 1 && !fastMap) {
+                assert remapKeys == null || remapKeys.size() == 1;
 
-                    singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
-                }
+                singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+            }
+            else {
+                Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+                    topVer,
+                    futVer,
+                    updVer,
+                    remapKeys);
+
+                if (pendingMappings.size() == 1)
+                    singleReq0 = F.firstValue(pendingMappings);
                 else {
-                    Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
-                        topVer,
-                        futVer,
-                        updVer,
-                        remapKeys);
+                    if (syncMode == PRIMARY_SYNC) {
+                        mappings0 = U.newHashMap(pendingMappings.size());
 
-                    if (pendingMappings.size() == 1)
-                        singleReq0 = F.firstValue(pendingMappings);
-                    else {
-                        if (syncMode == PRIMARY_SYNC) {
-                            mappings0 = U.newHashMap(pendingMappings.size());
-
-                            for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
-                                if (req.hasPrimary())
-                                    mappings0.put(req.nodeId(), req);
-                            }
+                        for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+                            if (req.hasPrimary())
+                                mappings0.put(req.nodeId(), req);
                         }
-                        else
-                            mappings0 = pendingMappings;
-
-                        assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
                     }
+                    else
+                        mappings0 = pendingMappings;
+
+                    assert !mappings0.isEmpty() || size == 0 : this;
                 }
+            }
 
-                synchronized (this) {
-                    assert this.futVer == null : this;
-                    assert this.topVer == AffinityTopologyVersion.ZERO : this;
+            synchronized (this) {
+                assert this.futVer == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
-                    this.topVer = topVer;
-                    this.updVer = updVer;
-                    this.futVer = futVer;
+                this.topVer = topVer;
+                this.updVer = updVer;
+                this.futVer = futVer;
 
-                    resCnt = 0;
+                resCnt = 0;
 
-                    singleReq = singleReq0;
-                    mappings = mappings0;
+                singleReq = singleReq0;
+                mappings = mappings0;
 
-                    this.remapKeys = null;
-                }
-            }
-            catch (Exception e) {
-                err = e;
+                this.remapKeys = null;
             }
+        }
+        catch (Exception e) {
+            err = e;
+        }
 
-            if (err != null) {
-                onDone(err);
+        if (err != null) {
+            onDone(err);
 
-                return;
-            }
+            return;
+        }
 
-            if (storeFuture()) {
-                if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
-                    assert isDone() : GridNearAtomicUpdateFuture.this;
+        if (storeFuture()) {
+            if (!cctx.mvcc().addAtomicFuture(futVer, this)) {
+                assert isDone() : this;
 
-                    return;
-                }
+                return;
             }
+        }
 
-            // Optimize mapping for single key.
-            if (singleReq0 != null)
-                mapSingle(singleReq0.nodeId(), singleReq0);
-            else {
-                assert mappings0 != null;
+        // Optimize mapping for single key.
+        if (singleReq0 != null)
+            mapSingle(singleReq0.nodeId(), singleReq0);
+        else {
+            assert mappings0 != null;
 
-                if (size == 0)
-                    onDone(new GridCacheReturn(cctx, true, true, null, true));
-                else
-                    doUpdate(mappings0);
-            }
+            if (size == 0)
+                onDone(new GridCacheReturn(cctx, true, true, null, true));
+            else
+                doUpdate(mappings0);
         }
+    }
 
-        /**
-         * @param topVer Topology version.
-         * @return Future.
-         */
-        @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
-            if (this.topVer == AffinityTopologyVersion.ZERO)
-                return null;
-
-            if (this.topVer.compareTo(topVer) < 0) {
-                if (topCompleteFut == null)
-                    topCompleteFut = new GridFutureAdapter<>();
+    /**
+     * @param topVer Topology version.
+     * @return Future.
+     */
+    @Nullable private synchronized GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer) {
+        if (this.topVer == AffinityTopologyVersion.ZERO)
+            return null;
 
-                return topCompleteFut;
-            }
+        if (this.topVer.compareTo(topVer) < 0) {
+            if (topCompleteFut == null)
+                topCompleteFut = new GridFutureAdapter<>();
 
-            return null;
+            return topCompleteFut;
         }
 
-        /**
-         * @return Future version.
-         */
-        GridCacheVersion onFutureDone() {
-            GridCacheVersion ver0;
-
-            GridFutureAdapter<Void> fut0;
+        return null;
+    }
 
-            synchronized (this) {
-                fut0 = topCompleteFut;
+    /**
+     * @return Future version.
+     */
+    private GridCacheVersion onFutureDone() {
+        GridCacheVersion ver0;
 
-                topCompleteFut = null;
+        GridFutureAdapter<Void> fut0;
 
-                ver0 = futVer;
+        synchronized (this) {
+            fut0 = topCompleteFut;
 
-                futVer = null;
-            }
+            topCompleteFut = null;
 
-            if (fut0 != null)
-                fut0.onDone();
+            ver0 = futVer;
 
-            return ver0;
+            futVer = null;
         }
 
-        /**
-         * @param topNodes Cache nodes.
-         * @param topVer Topology version.
-         * @param futVer Future version.
-         * @param updVer Update version.
-         * @param remapKeys Keys to remap.
-         * @return Mapping.
-         * @throws Exception If failed.
-         */
-        private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
-            AffinityTopologyVersion topVer,
-            GridCacheVersion futVer,
-            @Nullable GridCacheVersion updVer,
-            @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
-            Iterator<?> it = null;
-
-            if (vals != null)
-                it = vals.iterator();
-
-            Iterator<GridCacheDrInfo> conflictPutValsIt = null;
-
-            if (conflictPutVals != null)
-                conflictPutValsIt = conflictPutVals.iterator();
-
-            Iterator<GridCacheVersion> conflictRmvValsIt = null;
-
-            if (conflictRmvVals != null)
-                conflictRmvValsIt = conflictRmvVals.iterator();
-
-            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
-
-            // Create mappings first, then send messages.
-            for (Object key : keys) {
-                if (key == null)
-                    throw new NullPointerException("Null key.");
-
-                Object val;
-                GridCacheVersion conflictVer;
-                long conflictTtl;
-                long conflictExpireTime;
-
-                if (vals != null) {
-                    val = it.next();
-                    conflictVer = null;
-                    conflictTtl = CU.TTL_NOT_CHANGED;
-                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-
-                    if (val == null)
-                        throw new NullPointerException("Null value.");
-                }
-                else if (conflictPutVals != null) {
-                    GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
-
-                    val = conflictPutVal.valueEx();
-                    conflictVer = conflictPutVal.version();
-                    conflictTtl =  conflictPutVal.ttl();
-                    conflictExpireTime = conflictPutVal.expireTime();
-                }
-                else if (conflictRmvVals != null) {
-                    val = null;
-                    conflictVer = conflictRmvValsIt.next();
-                    conflictTtl = CU.TTL_NOT_CHANGED;
-                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-                }
-                else {
-                    val = null;
-                    conflictVer = null;
-                    conflictTtl = CU.TTL_NOT_CHANGED;
-                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-                }
-
-                if (val == null && op != GridCacheOperation.DELETE)
-                    continue;
-
-                KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+        if (fut0 != null)
+            fut0.onDone();
 
-                if (remapKeys != null && !remapKeys.contains(cacheKey))
-                    continue;
+        return ver0;
+    }
 
-                if (op != TRANSFORM)
-                    val = cctx.toCacheObject(val);
+    /**
+     * @param topNodes Cache nodes.
+     * @param topVer Topology version.
+     * @param futVer Future version.
+     * @param updVer Update version.
+     * @param remapKeys Keys to remap.
+     * @return Mapping.
+     * @throws Exception If failed.
+     */
+    private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+        AffinityTopologyVersion topVer,
+        GridCacheVersion futVer,
+        @Nullable GridCacheVersion updVer,
+        @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+        Iterator<?> it = null;
 
-                Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+        if (vals != null)
+            it = vals.iterator();
 
-                if (affNodes.isEmpty())
-                    throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                        "(all partition nodes left the grid).");
+        Iterator<GridCacheDrInfo> conflictPutValsIt = null;
 
-                int i = 0;
-
-                for (ClusterNode affNode : affNodes) {
-                    if (affNode == null)
-                        throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                            "(all partition nodes left the grid).");
-
-                    UUID nodeId = affNode.id();
-
-                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
-                    if (mapped == null) {
-                        mapped = new GridNearAtomicUpdateRequest(
-                            cctx.cacheId(),
-                            nodeId,
-                            futVer,
-                            fastMap,
-                            updVer,
-                            topVer,
-                            topLocked,
-                            syncMode,
-                            op,
-                            retval,
-                            expiryPlc,
-                            invokeArgs,
-                            filter,
-                            subjId,
-                            taskNameHash,
-                            skipStore,
-                            keepBinary,
-                            cctx.kernalContext().clientNode(),
-                            cctx.deploymentEnabled(),
-                            keys.size());
-
-                        pendingMappings.put(nodeId, mapped);
-                    }
+        if (conflictPutVals != null)
+            conflictPutValsIt = conflictPutVals.iterator();
 
-                    mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+        Iterator<GridCacheVersion> conflictRmvValsIt = null;
 
-                    i++;
-                }
-            }
+        if (conflictRmvVals != null)
+            conflictRmvValsIt = conflictRmvVals.iterator();
 
-            return pendingMappings;
-        }
+        Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
 
-        /**
-         * @param topVer Topology version.
-         * @param futVer Future version.
-         * @param updVer Update version.
-         * @return Request.
-         * @throws Exception If failed.
-         */
-        private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
-            GridCacheVersion futVer,
-            @Nullable GridCacheVersion updVer) throws Exception {
-            Object key = F.first(keys);
+        // Create mappings first, then send messages.
+        for (Object key : keys) {
+            if (key == null)
+                throw new NullPointerException("Null key.");
 
             Object val;
             GridCacheVersion conflictVer;
@@ -971,127 +832,231 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
             long conflictExpireTime;
 
             if (vals != null) {
-                // Regular PUT.
-                val = F.first(vals);
+                val = it.next();
                 conflictVer = null;
                 conflictTtl = CU.TTL_NOT_CHANGED;
                 conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+                if (val == null)
+                    throw new NullPointerException("Null value.");
             }
             else if (conflictPutVals != null) {
-                // Conflict PUT.
-                GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+                GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
 
                 val = conflictPutVal.valueEx();
                 conflictVer = conflictPutVal.version();
-                conflictTtl = conflictPutVal.ttl();
+                conflictTtl =  conflictPutVal.ttl();
                 conflictExpireTime = conflictPutVal.expireTime();
             }
             else if (conflictRmvVals != null) {
-                // Conflict REMOVE.
                 val = null;
-                conflictVer = F.first(conflictRmvVals);
+                conflictVer = conflictRmvValsIt.next();
                 conflictTtl = CU.TTL_NOT_CHANGED;
                 conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
             }
             else {
-                // Regular REMOVE.
                 val = null;
                 conflictVer = null;
                 conflictTtl = CU.TTL_NOT_CHANGED;
                 conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
             }
 
-            // We still can get here if user pass map with single element.
-            if (key == null)
-                throw new NullPointerException("Null key.");
-
             if (val == null && op != GridCacheOperation.DELETE)
-                throw new NullPointerException("Null value.");
+                continue;
 
             KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
 
+            if (remapKeys != null && !remapKeys.contains(cacheKey))
+                continue;
+
             if (op != TRANSFORM)
                 val = cctx.toCacheObject(val);
 
-            ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
-
-            if (primary == null)
-                throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
-                    "left the grid).");
-
-            GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
-                cctx.cacheId(),
-                primary.id(),
-                futVer,
-                fastMap,
-                updVer,
-                topVer,
-                topLocked,
-                syncMode,
-                op,
-                retval,
-                expiryPlc,
-                invokeArgs,
-                filter,
-                subjId,
-                taskNameHash,
-                skipStore,
-                keepBinary,
-                cctx.kernalContext().clientNode(),
-                cctx.deploymentEnabled(),
-                1);
-
-            req.addUpdateEntry(cacheKey,
-                val,
-                conflictTtl,
-                conflictExpireTime,
-                conflictVer,
-                true);
-
-            return req;
-        }
+            Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
 
-        /**
-         * @param ret Result from single node.
-         */
-        @SuppressWarnings("unchecked")
-        private void addInvokeResults(GridCacheReturn ret) {
-            assert op == TRANSFORM : op;
-            assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
-            if (ret.value() != null) {
-                if (opRes != null)
-                    opRes.mergeEntryProcessResults(ret);
-                else
-                    opRes = ret;
-            }
-        }
+            if (affNodes.isEmpty())
+                throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                    "(all partition nodes left the grid).");
+
+            int i = 0;
 
-        /**
-         * @param failedKeys Failed keys.
-         * @param topVer Topology version for failed update.
-         * @param err Error cause.
-         */
-        private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
-            AffinityTopologyVersion topVer,
-            Throwable err) {
-            CachePartialUpdateCheckedException err0 = this.err;
+            for (ClusterNode affNode : affNodes) {
+                if (affNode == null)
+                    throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                        "(all partition nodes left the grid).");
 
-            if (err0 == null)
-                err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+                UUID nodeId = affNode.id();
 
-            Collection<Object> keys = new ArrayList<>(failedKeys.size());
+                GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+
+                if (mapped == null) {
+                    mapped = new GridNearAtomicUpdateRequest(
+                        cctx.cacheId(),
+                        nodeId,
+                        futVer,
+                        fastMap,
+                        updVer,
+                        topVer,
+                        topLocked,
+                        syncMode,
+                        op,
+                        retval,
+                        expiryPlc,
+                        invokeArgs,
+                        filter,
+                        subjId,
+                        taskNameHash,
+                        skipStore,
+                        keepBinary,
+                        cctx.kernalContext().clientNode(),
+                        cctx.deploymentEnabled(),
+                        keys.size());
+
+                    pendingMappings.put(nodeId, mapped);
+                }
 
-            for (KeyCacheObject key : failedKeys)
-                keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+                mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
 
-            err0.add(keys, err, topVer);
+                i++;
+            }
         }
 
-        /** {@inheritDoc} */
-        @Override public synchronized  String toString() {
-            return S.toString(UpdateState.class, this);
+        return pendingMappings;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param futVer Future version.
+     * @param updVer Update version.
+     * @return Request.
+     * @throws Exception If failed.
+     */
+    private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+        GridCacheVersion futVer,
+        @Nullable GridCacheVersion updVer) throws Exception {
+        Object key = F.first(keys);
+
+        Object val;
+        GridCacheVersion conflictVer;
+        long conflictTtl;
+        long conflictExpireTime;
+
+        if (vals != null) {
+            // Regular PUT.
+            val = F.first(vals);
+            conflictVer = null;
+            conflictTtl = CU.TTL_NOT_CHANGED;
+            conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+        }
+        else if (conflictPutVals != null) {
+            // Conflict PUT.
+            GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+
+            val = conflictPutVal.valueEx();
+            conflictVer = conflictPutVal.version();
+            conflictTtl = conflictPutVal.ttl();
+            conflictExpireTime = conflictPutVal.expireTime();
+        }
+        else if (conflictRmvVals != null) {
+            // Conflict REMOVE.
+            val = null;
+            conflictVer = F.first(conflictRmvVals);
+            conflictTtl = CU.TTL_NOT_CHANGED;
+            conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
         }
+        else {
+            // Regular REMOVE.
+            val = null;
+            conflictVer = null;
+            conflictTtl = CU.TTL_NOT_CHANGED;
+            conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+        }
+
+        // We still can get here if user pass map with single element.
+        if (key == null)
+            throw new NullPointerException("Null key.");
+
+        if (val == null && op != GridCacheOperation.DELETE)
+            throw new NullPointerException("Null value.");
+
+        KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+        if (op != TRANSFORM)
+            val = cctx.toCacheObject(val);
+
+        ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+
+        if (primary == null)
+            throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                "left the grid).");
+
+        GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+            cctx.cacheId(),
+            primary.id(),
+            futVer,
+            fastMap,
+            updVer,
+            topVer,
+            topLocked,
+            syncMode,
+            op,
+            retval,
+            expiryPlc,
+            invokeArgs,
+            filter,
+            subjId,
+            taskNameHash,
+            skipStore,
+            keepBinary,
+            cctx.kernalContext().clientNode(),
+            cctx.deploymentEnabled(),
+            1);
+
+        req.addUpdateEntry(cacheKey,
+            val,
+            conflictTtl,
+            conflictExpireTime,
+            conflictVer,
+            true);
+
+        return req;
+    }
+
+    /**
+     * @param ret Result from single node.
+     */
+    @SuppressWarnings("unchecked")
+    private void addInvokeResults(GridCacheReturn ret) {
+        assert op == TRANSFORM : op;
+        assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+        if (ret.value() != null) {
+            if (opRes != null)
+                opRes.mergeEntryProcessResults(ret);
+            else
+                opRes = ret;
+        }
+    }
+
+    /**
+     * @param failedKeys Failed keys.
+     * @param topVer Topology version for failed update.
+     * @param err Error cause.
+     */
+    private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+        AffinityTopologyVersion topVer,
+        Throwable err) {
+        CachePartialUpdateCheckedException err0 = this.err;
+
+        if (err0 == null)
+            err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+        Collection<Object> keys = new ArrayList<>(failedKeys.size());
+
+        for (KeyCacheObject key : failedKeys)
+            keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+
+        err0.add(keys, err, topVer);
     }
 
     /** {@inheritDoc} */