You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/03 10:47:05 UTC

[2/8] ignite git commit: IGNITE-2532: WIP. Only refactorings for now.

IGNITE-2532: WIP. Only refactorings for now.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46652832
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46652832
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46652832

Branch: refs/heads/ignite-2523
Commit: 466528329942f009c9e591a16b9453b95830b0cb
Parents: e6acce6
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 10:24:01 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 10:24:01 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |    6 +
 .../processors/cache/GridCacheIoManager.java    |   17 +
 .../dht/atomic/GridDhtAtomicCache.java          |   42 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   12 +-
 .../GridNearAbstractAtomicUpdateFuture.java     |  248 ++++
 .../GridNearAtomicSingleUpdateFuture.java       | 1264 ++++++++++++++++++
 .../GridNearAtomicSingleUpdateRequest.java      | 1044 +++++++++++++++
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  186 +--
 .../dht/atomic/GridNearAtomicUpdateRequest.java |    3 +-
 .../GridNearAtomicUpdateRequestInterface.java   |  101 ++
 .../distributed/near/GridNearAtomicCache.java   |    4 +-
 11 files changed, 2725 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3c7f378..25e07b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlock
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
@@ -726,6 +727,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case -23:
+                msg = new GridNearAtomicSingleUpdateRequest();
+
+                break;
+
             // [-3..119] [124] - this
             // [120..123] - DR
             // [-4..-22] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b297827..57545af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
@@ -586,6 +587,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
+            case -23: {
+                GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
+
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                    ctx.cacheId(),
+                    nodeId,
+                    req.futureVersion(),
+                    ctx.deploymentEnabled());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
                     + msg + "]", msg.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f6f57ee..cdaa061 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -139,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
 
     /** Update reply closure. */
-    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
+    private CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> updateReplyClos;
 
     /** Pending  */
     private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
@@ -192,9 +192,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+        updateReplyClos = new CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse>() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-            @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+            @Override public void apply(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
                 if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
                     assert req.writeSynchronizationMode() != FULL_ASYNC : req;
 
@@ -256,6 +256,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicSingleUpdateRequest.class, new CI2<UUID, GridNearAtomicSingleUpdateRequest>() {
+            @Override public void apply(UUID nodeId, GridNearAtomicSingleUpdateRequest req) {
+                processNearAtomicUpdateRequest(nodeId, req);
+            }
+        });
+
         ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
             @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
                 processNearAtomicUpdateResponse(nodeId, res);
@@ -1304,8 +1310,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     public void updateAllAsyncInternal(
         final UUID nodeId,
-        final GridNearAtomicUpdateRequest req,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        final GridNearAtomicUpdateRequestInterface req,
+        final CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
 
@@ -1329,8 +1335,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     public void updateAllAsyncInternal0(
         UUID nodeId,
-        GridNearAtomicUpdateRequest req,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        GridNearAtomicUpdateRequestInterface req,
+        CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
             ctx.deploymentEnabled());
@@ -1552,12 +1558,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private UpdateBatchResult updateWithBatch(
         ClusterNode node,
         boolean hasNear,
-        GridNearAtomicUpdateRequest req,
+        GridNearAtomicUpdateRequestInterface req,
         GridNearAtomicUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -1968,12 +1974,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private UpdateSingleResult updateSingle(
         ClusterNode node,
         boolean hasNear,
-        GridNearAtomicUpdateRequest req,
+        GridNearAtomicUpdateRequestInterface req,
         GridNearAtomicUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -2208,8 +2214,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable Collection<KeyCacheObject> rmvKeys,
         @Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
-        final GridNearAtomicUpdateRequest req,
+        CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
+        final GridNearAtomicUpdateRequestInterface req,
         final GridNearAtomicUpdateResponse res,
         boolean replicate,
         UpdateBatchResult batchRes,
@@ -2587,7 +2593,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *      will return false.
      * @return {@code True} if filter evaluation succeeded.
      */
-    private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req,
+    private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequestInterface req,
         GridNearAtomicUpdateResponse res) {
         try {
             return ctx.isAllLocked(entry, req.filter());
@@ -2602,7 +2608,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * @param req Request to remap.
      */
-    private void remapToNewPrimary(GridNearAtomicUpdateRequest req) {
+    private void remapToNewPrimary(GridNearAtomicUpdateRequestInterface req) {
         assert req.writeSynchronizationMode() == FULL_ASYNC : req;
 
         if (log.isDebugEnabled())
@@ -2681,9 +2687,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicUpdateRequest updateReq,
+        GridNearAtomicUpdateRequestInterface updateReq,
         GridNearAtomicUpdateResponse updateRes,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
         boolean force
     ) {
         if (!force) {
@@ -2714,7 +2720,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param nodeId Sender node ID.
      * @param req Near atomic update request.
      */
-    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) {
+    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequestInterface req) {
         if (log.isDebugEnabled())
             log.debug("Processing near atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index e31af19..6891d3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -53,8 +53,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 /**
  * DHT atomic cache backup update future.
  */
-public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
-    implements GridCacheAtomicFuture<Void> {
+public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implements GridCacheAtomicFuture<Void> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -78,7 +77,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** Completion callback. */
     @GridToStringExclude
-    private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+    private final CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb;
 
     /** Mappings. */
     @GridToStringInclude
@@ -88,7 +87,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
 
     /** Update request. */
-    private final GridNearAtomicUpdateRequest updateReq;
+    private final GridNearAtomicUpdateRequestInterface updateReq;
 
     /** Update response. */
     private final GridNearAtomicUpdateResponse updateRes;
@@ -111,10 +110,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      */
     public GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicUpdateRequest,
-        GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
         GridCacheVersion writeVer,
-        GridNearAtomicUpdateRequest updateReq,
+        GridNearAtomicUpdateRequestInterface updateReq,
         GridNearAtomicUpdateResponse updateRes
     ) {
         this.cctx = cctx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
new file mode 100644
index 0000000..60e0c5f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * Base class for near atomic update futures.
+ */
+public abstract class GridNearAbstractAtomicUpdateFuture extends GridFutureAdapter<Object>
+    implements GridCacheAtomicFuture<Object> {
+    /** Logger reference. */
+    protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
+    /** Optional arguments for entry processor. */
+    protected Object[] invokeArgs;
+
+    /** Cache context. */
+    protected final GridCacheContext cctx;
+
+    /** Cache. */
+    protected final GridDhtAtomicCache cache;
+
+    /** Update operation. */
+    protected final GridCacheOperation op;
+
+    /** Return value require flag. */
+    protected final boolean retval;
+
+    /** Expiry policy. */
+    protected final ExpiryPolicy expiryPlc;
+
+    /** Optional filter. */
+    protected final CacheEntryPredicate[] filter;
+
+    /** Write synchronization mode. */
+    protected final CacheWriteSynchronizationMode syncMode;
+
+    /** Raw return value flag. */
+    protected final boolean rawRetval;
+
+    /** Fast map flag. */
+    protected final boolean fastMap;
+
+    /** Near cache flag. */
+    protected final boolean nearEnabled;
+
+    /** Subject ID. */
+    protected final UUID subjId;
+
+    /** Task name hash. */
+    protected final int taskNameHash;
+
+    /** Skip store flag. */
+    protected final boolean skipStore;
+
+    /** */
+    protected final boolean keepBinary;
+
+    /** Wait for topology future flag. */
+    protected final boolean waitTopFut;
+
+    /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+    protected boolean topLocked;
+
+    /** Remap count. */
+    protected int remapCnt;
+
+    /**
+     * @param cctx Cache context.
+     * @param cache Cache instance.
+     * @param syncMode Write synchronization mode.
+     * @param op Update operation.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @param retval Return value require flag.
+     * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+     * @param expiryPlc Expiry policy explicitly specified for cache operation.
+     * @param filter Entry filter.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip store flag.
+     * @param keepBinary Keep binary flag.
+     * @param remapCnt Maximum number of retries.
+     * @param waitTopFut If {@code false} does not wait for affinity change future.
+     */
+    public GridNearAbstractAtomicUpdateFuture(
+        GridCacheContext cctx,
+        GridDhtAtomicCache cache,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        @Nullable Object[] invokeArgs,
+        final boolean retval,
+        final boolean rawRetval,
+        @Nullable ExpiryPolicy expiryPlc,
+        final CacheEntryPredicate[] filter,
+        UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        int remapCnt,
+        boolean waitTopFut
+    ) {
+        this.rawRetval = rawRetval;
+
+        assert subjId != null;
+
+        this.cctx = cctx;
+        this.cache = cache;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.invokeArgs = invokeArgs;
+        this.retval = retval;
+        this.expiryPlc = expiryPlc;
+        this.filter = filter;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.skipStore = skipStore;
+        this.keepBinary = keepBinary;
+        this.waitTopFut = waitTopFut;
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+        fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+            cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+            !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+        nearEnabled = CU.isNearEnabled(cctx);
+
+        if (!waitTopFut)
+            remapCnt = 1;
+
+        this.remapCnt = remapCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+
+    /**
+     * @return {@code True} if this future should block partition map exchange.
+     */
+    protected boolean waitForPartitionExchange() {
+        // Wait fast-map near atomic update futures in CLOCK mode.
+        return fastMap;
+    }
+
+    /**
+     * Updates near cache.
+     *
+     * @param req Update request.
+     * @param res Update response.
+     */
+    protected void updateNear(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
+        assert nearEnabled;
+
+        if (res.remapKeys() != null || !req.hasPrimary())
+            return;
+
+        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+        near.processNearAtomicUpdateResponse(req, res);
+    }
+
+    /**
+     * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+     */
+    protected boolean storeFuture() {
+        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+    }
+
+    /**
+     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+     * node and send updates in parallel to all participating nodes.
+     *
+     * @param key Key to map.
+     * @param topVer Topology version to map.
+     * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+     * @return Collection of nodes to which key is mapped.
+     */
+    protected Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer, boolean fastMap
+    ) {
+        GridCacheAffinityManager affMgr = cctx.affinity();
+
+        // If we can send updates in parallel - do it.
+        return fastMap ?
+            cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primary(key, topVer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
new file mode 100644
index 0000000..d633e47
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -0,0 +1,1264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * DHT atomic cache single near update future.
+ */
+public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
+    implements GridCacheAtomicFuture<Object>{
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
+    /** Cache context. */
+    private final GridCacheContext cctx;
+
+    /** Cache. */
+    private GridDhtAtomicCache cache;
+
+    /** Update operation. */
+    private final GridCacheOperation op;
+
+    /** Keys */
+    private Collection<?> keys;
+
+    /** Values. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private Collection<?> vals;
+
+    /** Optional arguments for entry processor. */
+    private Object[] invokeArgs;
+
+    /** Conflict put values. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private Collection<GridCacheDrInfo> conflictPutVals;
+
+    /** Conflict remove values. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private Collection<GridCacheVersion> conflictRmvVals;
+
+    /** Return value require flag. */
+    private final boolean retval;
+
+    /** Expiry policy. */
+    private final ExpiryPolicy expiryPlc;
+
+    /** Optional filter. */
+    private final CacheEntryPredicate[] filter;
+
+    /** Write synchronization mode. */
+    private final CacheWriteSynchronizationMode syncMode;
+
+    /** Raw return value flag. */
+    private final boolean rawRetval;
+
+    /** Fast map flag. */
+    private final boolean fastMap;
+
+    /** Near cache flag. */
+    private final boolean nearEnabled;
+
+    /** Subject ID. */
+    private final UUID subjId;
+
+    /** Task name hash. */
+    private final int taskNameHash;
+
+    /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+    private boolean topLocked;
+
+    /** Skip store flag. */
+    private final boolean skipStore;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** Wait for topology future flag. */
+    private final boolean waitTopFut;
+
+    /** Remap count. */
+    private int remapCnt;
+
+    /** State. */
+    private final UpdateState state;
+
+    /**
+     * @param cctx Cache context.
+     * @param cache Cache instance.
+     * @param syncMode Write synchronization mode.
+     * @param op Update operation.
+     * @param keys Keys to update.
+     * @param vals Values or transform closure.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @param conflictPutVals Conflict put values (optional).
+     * @param conflictRmvVals Conflict remove values (optional).
+     * @param retval Return value require flag.
+     * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+     * @param expiryPlc Expiry policy explicitly specified for cache operation.
+     * @param filter Entry filter.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip store flag.
+     * @param keepBinary Keep binary flag.
+     * @param remapCnt Maximum number of retries.
+     * @param waitTopFut If {@code false} does not wait for affinity change future.
+     */
+    public GridNearAtomicSingleUpdateFuture(
+        GridCacheContext cctx,
+        GridDhtAtomicCache cache,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        Collection<?> keys,
+        @Nullable Collection<?> vals,
+        @Nullable Object[] invokeArgs,
+        @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+        @Nullable Collection<GridCacheVersion> conflictRmvVals,
+        final boolean retval,
+        final boolean rawRetval,
+        @Nullable ExpiryPolicy expiryPlc,
+        final CacheEntryPredicate[] filter,
+        UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        int remapCnt,
+        boolean waitTopFut
+    ) {
+        this.rawRetval = rawRetval;
+
+        assert vals == null || vals.size() == keys.size();
+        assert conflictPutVals == null || conflictPutVals.size() == keys.size();
+        assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
+        assert subjId != null;
+
+        this.cctx = cctx;
+        this.cache = cache;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.keys = keys;
+        this.vals = vals;
+        this.invokeArgs = invokeArgs;
+        this.conflictPutVals = conflictPutVals;
+        this.conflictRmvVals = conflictRmvVals;
+        this.retval = retval;
+        this.expiryPlc = expiryPlc;
+        this.filter = filter;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.skipStore = skipStore;
+        this.keepBinary = keepBinary;
+        this.waitTopFut = waitTopFut;
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+        fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+            cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+            !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+        nearEnabled = CU.isNearEnabled(cctx);
+
+        if (!waitTopFut)
+            remapCnt = 1;
+
+        this.remapCnt = remapCnt;
+
+        state = new UpdateState();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return state.futureVersion();
+    }
+
+    /**
+     * @return {@code True} if this future should block partition map exchange.
+     */
+    private boolean waitForPartitionExchange() {
+        // Wait fast-map near atomic update futures in CLOCK mode.
+        return fastMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<?> keys() {
+        return keys;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        state.onNodeLeft(nodeId);
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+
+    /**
+     * Performs future mapping.
+     */
+    public void map() {
+        AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
+
+        if (topVer == null)
+            mapOnTopology();
+        else {
+            topLocked = true;
+
+            // Cannot remap.
+            remapCnt = 1;
+
+            state.map(topVer, null);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        if (waitForPartitionExchange()) {
+            GridFutureAdapter<Void> fut = state.completeFuture(topVer);
+
+            if (fut != null && isDone()) {
+                fut.onDone();
+
+                return null;
+            }
+
+            return fut;
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+        assert res == null || res instanceof GridCacheReturn;
+
+        GridCacheReturn ret = (GridCacheReturn)res;
+
+        Object retval =
+            res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
+                cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+
+        if (op == TRANSFORM && retval == null)
+            retval = Collections.emptyMap();
+
+        if (super.onDone(retval, err)) {
+            GridCacheVersion futVer = state.onFutureDone();
+
+            if (futVer != null)
+                cctx.mvcc().removeAtomicFuture(futVer);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Response callback.
+     *
+     * @param nodeId Node ID.
+     * @param res Update response.
+     */
+    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+        state.onResult(nodeId, res, false);
+    }
+
+    /**
+     * Updates near cache.
+     *
+     * @param req Update request.
+     * @param res Update response.
+     */
+    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+        assert nearEnabled;
+
+        if (res.remapKeys() != null || !req.hasPrimary())
+            return;
+
+        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+        near.processNearAtomicUpdateResponse(req, res);
+    }
+
+    /**
+     * Maps future on ready topology.
+     */
+    private void mapOnTopology() {
+        cache.topology().readLock();
+
+        AffinityTopologyVersion topVer = null;
+
+        try {
+            if (cache.topology().stopping()) {
+                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                    cache.name()));
+
+                return;
+            }
+
+            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+
+            if (fut.isDone()) {
+                Throwable err = fut.validateCache(cctx);
+
+                if (err != null) {
+                    onDone(err);
+
+                    return;
+                }
+
+                topVer = fut.topologyVersion();
+            }
+            else {
+                if (waitTopFut) {
+                    assert !topLocked : this;
+
+                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    mapOnTopology();
+                                }
+                            });
+                        }
+                    });
+                }
+                else
+                    onDone(new GridCacheTryPutFailedException());
+
+                return;
+            }
+        }
+        finally {
+            cache.topology().readUnlock();
+        }
+
+        state.map(topVer, null);
+    }
+
+    /**
+     * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+     */
+    private boolean storeFuture() {
+        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+    }
+
+    /**
+     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+     * node and send updates in parallel to all participating nodes.
+     *
+     * @param key Key to map.
+     * @param topVer Topology version to map.
+     * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+     * @return Collection of nodes to which key is mapped.
+     */
+    private Collection<ClusterNode> mapKey(
+        KeyCacheObject key,
+        AffinityTopologyVersion topVer,
+        boolean fastMap
+    ) {
+        GridCacheAffinityManager affMgr = cctx.affinity();
+
+        // If we can send updates in parallel - do it.
+        return fastMap ?
+            cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primary(key, topVer));
+    }
+
+    /**
+     * Maps future to single node.
+     *
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+        if (cctx.localNodeId().equals(nodeId)) {
+            cache.updateAllAsyncInternal(nodeId, req,
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                        onResult(res.nodeId(), res);
+                    }
+                });
+        }
+        else {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+                if (syncMode == FULL_ASYNC)
+                    onDone(new GridCacheReturn(cctx, true, true, null, true));
+            }
+            catch (IgniteCheckedException e) {
+                state.onSendError(req, e);
+            }
+        }
+    }
+
+    /**
+     * Sends messages to remote nodes and updates local cache.
+     *
+     * @param mappings Mappings to send.
+     */
+    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+        UUID locNodeId = cctx.localNodeId();
+
+        GridNearAtomicUpdateRequest locUpdate = null;
+
+        // Send messages to remote nodes first, then run local update.
+        for (GridNearAtomicUpdateRequest req : mappings.values()) {
+            if (locNodeId.equals(req.nodeId())) {
+                assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+                    ", req=" + req + ']';
+
+                locUpdate = req;
+            }
+            else {
+                try {
+                    if (log.isDebugEnabled())
+                        log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                }
+                catch (IgniteCheckedException e) {
+                    state.onSendError(req, e);
+                }
+            }
+        }
+
+        if (locUpdate != null) {
+            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                        onResult(res.nodeId(), res);
+                    }
+                });
+        }
+
+        if (syncMode == FULL_ASYNC)
+            onDone(new GridCacheReturn(cctx, true, true, null, true));
+    }
+
+    /**
+     *
+     */
+    private class UpdateState {
+        /** Current topology version. */
+        private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+        /** */
+        private GridCacheVersion updVer;
+
+        /** Topology version when got mapping error. */
+        private AffinityTopologyVersion mapErrTopVer;
+
+        /** Mappings if operations is mapped to more than one node. */
+        @GridToStringInclude
+        private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+
+        /** */
+        private int resCnt;
+
+        /** Error. */
+        private CachePartialUpdateCheckedException err;
+
+        /** Future ID. */
+        private GridCacheVersion futVer;
+
+        /** Completion future for a particular topology version. */
+        private GridFutureAdapter<Void> topCompleteFut;
+
+        /** Keys to remap. */
+        private Collection<KeyCacheObject> remapKeys;
+
+        /** Not null is operation is mapped to single node. */
+        private GridNearAtomicUpdateRequest singleReq;
+
+        /** Operation result. */
+        private GridCacheReturn opRes;
+
+        /**
+         * @return Future version.
+         */
+        @Nullable synchronized GridCacheVersion futureVersion() {
+            return futVer;
+        }
+
+        /**
+         * @param nodeId Left node ID.
+         */
+        void onNodeLeft(UUID nodeId) {
+            GridNearAtomicUpdateResponse res = null;
+
+            synchronized (this) {
+                GridNearAtomicUpdateRequest req;
+
+                if (singleReq != null)
+                    req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+                else
+                    req = mappings != null ? mappings.get(nodeId) : null;
+
+                if (req != null && req.response() == null) {
+                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                        nodeId,
+                        req.futureVersion(),
+                        cctx.deploymentEnabled());
+
+                    ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+                        "before response is received: " + nodeId);
+
+                    e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+                    res.addFailedKeys(req.keys(), e);
+                }
+            }
+
+            if (res != null)
+                onResult(nodeId, res, true);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param res Response.
+         * @param nodeErr {@code True} if response was created on node failure.
+         */
+        @SuppressWarnings("unchecked")
+        void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+            GridNearAtomicUpdateRequest req;
+
+            AffinityTopologyVersion remapTopVer = null;
+
+            GridCacheReturn opRes0 = null;
+            CachePartialUpdateCheckedException err0 = null;
+
+            boolean rcvAll;
+
+            GridFutureAdapter<?> fut0 = null;
+
+            synchronized (this) {
+                if (!res.futureVersion().equals(futVer))
+                    return;
+
+                if (singleReq != null) {
+                    if (!singleReq.nodeId().equals(nodeId))
+                        return;
+
+                    req = singleReq;
+
+                    singleReq = null;
+
+                    rcvAll = true;
+                }
+                else {
+                    req = mappings != null ? mappings.get(nodeId) : null;
+
+                    if (req != null && req.onResponse(res)) {
+                        resCnt++;
+
+                        rcvAll = mappings.size() == resCnt;
+                    }
+                    else
+                        return;
+                }
+
+                assert req != null && req.topologyVersion().equals(topVer) : req;
+
+                if (res.remapKeys() != null) {
+                    assert !fastMap || cctx.kernalContext().clientNode();
+
+                    if (remapKeys == null)
+                        remapKeys = U.newHashSet(res.remapKeys().size());
+
+                    remapKeys.addAll(res.remapKeys());
+
+                    if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+                        mapErrTopVer = req.topologyVersion();
+                }
+                else if (res.error() != null) {
+                    if (res.failedKeys() != null)
+                        addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+                }
+                else {
+                    if (!req.fastMap() || req.hasPrimary()) {
+                        GridCacheReturn ret = res.returnValue();
+
+                        if (op == TRANSFORM) {
+                            if (ret != null)
+                                addInvokeResults(ret);
+                        }
+                        else
+                            opRes = ret;
+                    }
+                }
+
+                if (rcvAll) {
+                    if (remapKeys != null) {
+                        assert mapErrTopVer != null;
+
+                        remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+                    }
+                    else {
+                        if (err != null &&
+                            X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                            X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                            storeFuture() &&
+                            --remapCnt > 0) {
+                            ClusterTopologyCheckedException topErr =
+                                X.cause(err, ClusterTopologyCheckedException.class);
+
+                            if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                                CachePartialUpdateCheckedException cause =
+                                    X.cause(err, CachePartialUpdateCheckedException.class);
+
+                                assert cause != null && cause.topologyVersion() != null : err;
+
+                                remapTopVer =
+                                    new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+                                err = null;
+
+                                Collection<Object> failedKeys = cause.failedKeys();
+
+                                remapKeys = new ArrayList<>(failedKeys.size());
+
+                                for (Object key : failedKeys)
+                                    remapKeys.add(cctx.toCacheKeyObject(key));
+
+                                updVer = null;
+                            }
+                        }
+                    }
+
+                    if (remapTopVer == null) {
+                        err0 = err;
+                        opRes0 = opRes;
+                    }
+                    else {
+                        fut0 = topCompleteFut;
+
+                        topCompleteFut = null;
+
+                        cctx.mvcc().removeAtomicFuture(futVer);
+
+                        futVer = null;
+                        topVer = AffinityTopologyVersion.ZERO;
+                    }
+                }
+            }
+
+            if (res.error() != null && res.failedKeys() == null) {
+                onDone(res.error());
+
+                return;
+            }
+
+            if (rcvAll && nearEnabled) {
+                if (mappings != null) {
+                    for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+                        GridNearAtomicUpdateResponse res0 = req0.response();
+
+                        assert res0 != null : req0;
+
+                        updateNear(req0, res0);
+                    }
+                }
+                else if (!nodeErr)
+                    updateNear(req, res);
+            }
+
+            if (remapTopVer != null) {
+                if (fut0 != null)
+                    fut0.onDone();
+
+                if (!waitTopFut) {
+                    onDone(new GridCacheTryPutFailedException());
+
+                    return;
+                }
+
+                if (topLocked) {
+                    assert !F.isEmpty(remapKeys) : remapKeys;
+
+                    CachePartialUpdateCheckedException e =
+                        new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+                    ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                        "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+                    cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+                    e.add(remapKeys, cause);
+
+                    onDone(e);
+
+                    return;
+                }
+
+                IgniteInternalFuture<AffinityTopologyVersion> fut =
+                    cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+                if (fut == null)
+                    fut = new GridFinishedFuture<>(remapTopVer);
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    AffinityTopologyVersion topVer = fut.get();
+
+                                    map(topVer, remapKeys);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    onDone(e);
+                                }
+                            }
+                        });
+                    }
+                });
+
+                return;
+            }
+
+            if (rcvAll)
+                onDone(opRes0, err0);
+        }
+
+        /**
+         * @param req Request.
+         * @param e Error.
+         */
+        void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+            synchronized (this) {
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                    req.nodeId(),
+                    req.futureVersion(),
+                    cctx.deploymentEnabled());
+
+                res.addFailedKeys(req.keys(), e);
+
+                onResult(req.nodeId(), res, true);
+            }
+        }
+
+        /**
+         * @param topVer Topology version.
+         * @param remapKeys Keys to remap.
+         */
+        void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+            Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+
+            if (F.isEmpty(topNodes)) {
+                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                    "left the grid)."));
+
+                return;
+            }
+
+            Exception err = null;
+            GridNearAtomicUpdateRequest singleReq0 = null;
+            Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+
+            int size = keys.size();
+
+            GridCacheVersion futVer = cctx.versions().next(topVer);
+
+            GridCacheVersion updVer;
+
+            // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+            if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+                updVer = this.updVer;
+
+                if (updVer == null) {
+                    updVer = cctx.versions().next(topVer);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Assigned fast-map version for update on near node: " + updVer);
+                }
+            }
+            else
+                updVer = null;
+
+            try {
+                if (size == 1 && !fastMap) {
+                    assert remapKeys == null || remapKeys.size() == 1;
+
+                    singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+                }
+                else {
+                    Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+                        topVer,
+                        futVer,
+                        updVer,
+                        remapKeys);
+
+                    if (pendingMappings.size() == 1)
+                        singleReq0 = F.firstValue(pendingMappings);
+                    else {
+                        if (syncMode == PRIMARY_SYNC) {
+                            mappings0 = U.newHashMap(pendingMappings.size());
+
+                            for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+                                if (req.hasPrimary())
+                                    mappings0.put(req.nodeId(), req);
+                            }
+                        }
+                        else
+                            mappings0 = pendingMappings;
+
+                        assert !mappings0.isEmpty() || size == 0 : GridNearAtomicSingleUpdateFuture.this;
+                    }
+                }
+
+                synchronized (this) {
+                    assert this.futVer == null : this;
+                    assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+                    this.topVer = topVer;
+                    this.updVer = updVer;
+                    this.futVer = futVer;
+
+                    resCnt = 0;
+
+                    singleReq = singleReq0;
+                    mappings = mappings0;
+
+                    this.remapKeys = null;
+                }
+            }
+            catch (Exception e) {
+                err = e;
+            }
+
+            if (err != null) {
+                onDone(err);
+
+                return;
+            }
+
+            if (storeFuture()) {
+                if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicSingleUpdateFuture.this)) {
+                    assert isDone() : GridNearAtomicSingleUpdateFuture.this;
+
+                    return;
+                }
+            }
+
+            // Optimize mapping for single key.
+            if (singleReq0 != null)
+                mapSingle(singleReq0.nodeId(), singleReq0);
+            else {
+                assert mappings0 != null;
+
+                if (size == 0)
+                    onDone(new GridCacheReturn(cctx, true, true, null, true));
+                else
+                    doUpdate(mappings0);
+            }
+        }
+
+        /**
+         * @param topVer Topology version.
+         * @return Future.
+         */
+        @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
+            if (this.topVer == AffinityTopologyVersion.ZERO)
+                return null;
+
+            if (this.topVer.compareTo(topVer) < 0) {
+                if (topCompleteFut == null)
+                    topCompleteFut = new GridFutureAdapter<>();
+
+                return topCompleteFut;
+            }
+
+            return null;
+        }
+
+        /**
+         * @return Future version.
+         */
+        GridCacheVersion onFutureDone() {
+            GridCacheVersion ver0;
+
+            GridFutureAdapter<Void> fut0;
+
+            synchronized (this) {
+                fut0 = topCompleteFut;
+
+                topCompleteFut = null;
+
+                ver0 = futVer;
+
+                futVer = null;
+            }
+
+            if (fut0 != null)
+                fut0.onDone();
+
+            return ver0;
+        }
+
+        /**
+         * @param topNodes Cache nodes.
+         * @param topVer Topology version.
+         * @param futVer Future version.
+         * @param updVer Update version.
+         * @param remapKeys Keys to remap.
+         * @return Mapping.
+         * @throws Exception If failed.
+         */
+        private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+            AffinityTopologyVersion topVer,
+            GridCacheVersion futVer,
+            @Nullable GridCacheVersion updVer,
+            @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+            Iterator<?> it = null;
+
+            if (vals != null)
+                it = vals.iterator();
+
+            Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+
+            if (conflictPutVals != null)
+                conflictPutValsIt = conflictPutVals.iterator();
+
+            Iterator<GridCacheVersion> conflictRmvValsIt = null;
+
+            if (conflictRmvVals != null)
+                conflictRmvValsIt = conflictRmvVals.iterator();
+
+            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+
+            // Create mappings first, then send messages.
+            for (Object key : keys) {
+                if (key == null)
+                    throw new NullPointerException("Null key.");
+
+                Object val;
+                GridCacheVersion conflictVer;
+                long conflictTtl;
+                long conflictExpireTime;
+
+                if (vals != null) {
+                    val = it.next();
+                    conflictVer = null;
+                    conflictTtl = CU.TTL_NOT_CHANGED;
+                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+                    if (val == null)
+                        throw new NullPointerException("Null value.");
+                }
+                else if (conflictPutVals != null) {
+                    GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
+
+                    val = conflictPutVal.valueEx();
+                    conflictVer = conflictPutVal.version();
+                    conflictTtl =  conflictPutVal.ttl();
+                    conflictExpireTime = conflictPutVal.expireTime();
+                }
+                else if (conflictRmvVals != null) {
+                    val = null;
+                    conflictVer = conflictRmvValsIt.next();
+                    conflictTtl = CU.TTL_NOT_CHANGED;
+                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                }
+                else {
+                    val = null;
+                    conflictVer = null;
+                    conflictTtl = CU.TTL_NOT_CHANGED;
+                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                }
+
+                if (val == null && op != GridCacheOperation.DELETE)
+                    continue;
+
+                KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+                if (remapKeys != null && !remapKeys.contains(cacheKey))
+                    continue;
+
+                if (op != TRANSFORM)
+                    val = cctx.toCacheObject(val);
+
+                Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+
+                if (affNodes.isEmpty())
+                    throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                        "(all partition nodes left the grid).");
+
+                int i = 0;
+
+                for (ClusterNode affNode : affNodes) {
+                    if (affNode == null)
+                        throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                            "(all partition nodes left the grid).");
+
+                    UUID nodeId = affNode.id();
+
+                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+
+                    if (mapped == null) {
+                        mapped = new GridNearAtomicUpdateRequest(
+                            cctx.cacheId(),
+                            nodeId,
+                            futVer,
+                            fastMap,
+                            updVer,
+                            topVer,
+                            topLocked,
+                            syncMode,
+                            op,
+                            retval,
+                            expiryPlc,
+                            invokeArgs,
+                            filter,
+                            subjId,
+                            taskNameHash,
+                            skipStore,
+                            keepBinary,
+                            cctx.kernalContext().clientNode(),
+                            cctx.deploymentEnabled(),
+                            keys.size());
+
+                        pendingMappings.put(nodeId, mapped);
+                    }
+
+                    mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+
+                    i++;
+                }
+            }
+
+            return pendingMappings;
+        }
+
+        /**
+         * @param topVer Topology version.
+         * @param futVer Future version.
+         * @param updVer Update version.
+         * @return Request.
+         * @throws Exception If failed.
+         */
+        private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+            GridCacheVersion futVer,
+            @Nullable GridCacheVersion updVer) throws Exception {
+            Object key = F.first(keys);
+
+            Object val;
+            GridCacheVersion conflictVer;
+            long conflictTtl;
+            long conflictExpireTime;
+
+            if (vals != null) {
+                // Regular PUT.
+                val = F.first(vals);
+                conflictVer = null;
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+            }
+            else if (conflictPutVals != null) {
+                // Conflict PUT.
+                GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+
+                val = conflictPutVal.valueEx();
+                conflictVer = conflictPutVal.version();
+                conflictTtl = conflictPutVal.ttl();
+                conflictExpireTime = conflictPutVal.expireTime();
+            }
+            else if (conflictRmvVals != null) {
+                // Conflict REMOVE.
+                val = null;
+                conflictVer = F.first(conflictRmvVals);
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+            }
+            else {
+                // Regular REMOVE.
+                val = null;
+                conflictVer = null;
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+            }
+
+            // We still can get here if user pass map with single element.
+            if (key == null)
+                throw new NullPointerException("Null key.");
+
+            if (val == null && op != GridCacheOperation.DELETE)
+                throw new NullPointerException("Null value.");
+
+            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+            if (op != TRANSFORM)
+                val = cctx.toCacheObject(val);
+
+            ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+
+            if (primary == null)
+                throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                    "left the grid).");
+
+            GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+                cctx.cacheId(),
+                primary.id(),
+                futVer,
+                fastMap,
+                updVer,
+                topVer,
+                topLocked,
+                syncMode,
+                op,
+                retval,
+                expiryPlc,
+                invokeArgs,
+                filter,
+                subjId,
+                taskNameHash,
+                skipStore,
+                keepBinary,
+                cctx.kernalContext().clientNode(),
+                cctx.deploymentEnabled(),
+                1);
+
+            req.addUpdateEntry(cacheKey,
+                val,
+                conflictTtl,
+                conflictExpireTime,
+                conflictVer,
+                true);
+
+            return req;
+        }
+
+        /**
+         * @param ret Result from single node.
+         */
+        @SuppressWarnings("unchecked")
+        private void addInvokeResults(GridCacheReturn ret) {
+            assert op == TRANSFORM : op;
+            assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+            if (ret.value() != null) {
+                if (opRes != null)
+                    opRes.mergeEntryProcessResults(ret);
+                else
+                    opRes = ret;
+            }
+        }
+
+        /**
+         * @param failedKeys Failed keys.
+         * @param topVer Topology version for failed update.
+         * @param err Error cause.
+         */
+        private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+            AffinityTopologyVersion topVer,
+            Throwable err) {
+            CachePartialUpdateCheckedException err0 = this.err;
+
+            if (err0 == null)
+                err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+            Collection<Object> keys = new ArrayList<>(failedKeys.size());
+
+            for (KeyCacheObject key : failedKeys)
+                keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+
+            err0.add(keys, err, topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized  String toString() {
+            return S.toString(UpdateState.class, this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
+    }
+}