You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/02 15:34:27 UTC

[1/2] ignite git commit: ignite-4705

Repository: ignite
Updated Branches:
  refs/heads/ignite-4705-2 [created] 30bfae0e6


http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 204e510..6228f03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -269,7 +269,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
                 if (msgLog.isDebugEnabled()) {
                     msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
-                        ", writeVer=" + req.updateVersion() +
                         ", node=" + req.nodeId() + ']');
                 }
 
@@ -279,7 +278,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             catch (IgniteCheckedException e) {
                 if (msgLog.isDebugEnabled()) {
                     msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
-                        ", writeVer=" + req.updateVersion() +
                         ", node=" + req.nodeId() +
                         ", err=" + e + ']');
                 }
@@ -305,12 +303,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     public abstract void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res);
 
     /**
-     * @param nodeId Node ID.
-     * @param res Response.
-     */
-    public abstract void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res);
-
-    /**
      * @param req Request.
      * @param e Error.
      */
@@ -423,23 +415,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         }
 
         /**
-         * @param cctx Context.
-         * @param res Response.
-         * @return {@code True} if request processing finished.
-         */
-        boolean onMappingReceived(GridCacheContext cctx, GridNearAtomicMappingResponse res) {
-            if (finished() || mapping != null)
-                return false;
-
-            if (res.affinityMapping())
-                initAffinityMapping(cctx, null);
-            else
-                initMapping(cctx, res.mapping(), null);
-
-            return finished();
-        }
-
-        /**
          * @param nodeId Node ID.
          * @return {@code True} if request processing finished.
          */
@@ -465,41 +440,41 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             if (finished())
                 return false;
 
-            if (res.primaryDhtFailureResponse()) {
-                assert res.mapping() != null : res;
-                assert res.failedNodeId() != null : res;
-
-                nodeId = res.failedNodeId();
-            }
-
-            if (res.hasResult())
-                hasRes = true;
-
-            if (res.affinityMapping()) {
-                if (mapping == null) {
-                    initAffinityMapping(cctx, nodeId);
-
-                    return finished();
-                }
-            } else if (res.mapping() != null) {
-                // Mapping is sent from dht nodes.
-                if (mapping == null) {
-                    initMapping(cctx, res.mapping(), nodeId);
-
-                    return finished();
-                }
-            }
-            else {
-                // Mapping and result are sent from primary.
-                if (mapping == null) {
-                    if (rcvd == null)
-                        rcvd = new HashSet<>();
-
-                    rcvd.add(nodeId);
-
-                    return false; // Need wait for response from primary.
-                }
-            }
+//            if (res.primaryDhtFailureResponse()) {
+//                assert res.mapping() != null : res;
+//                assert res.failedNodeId() != null : res;
+//
+//                nodeId = res.failedNodeId();
+//            }
+//
+//            if (res.hasResult())
+//                hasRes = true;
+//
+//            if (res.affinityMapping()) {
+//                if (mapping == null) {
+//                    initAffinityMapping(cctx, nodeId);
+//
+//                    return finished();
+//                }
+//            } else if (res.mapping() != null) {
+//                // Mapping is sent from dht nodes.
+//                if (mapping == null) {
+//                    initMapping(cctx, res.mapping(), nodeId);
+//
+//                    return finished();
+//                }
+//            }
+//            else {
+//                // Mapping and result are sent from primary.
+//                if (mapping == null) {
+//                    if (rcvd == null)
+//                        rcvd = new HashSet<>();
+//
+//                    rcvd.add(nodeId);
+//
+//                    return false; // Need wait for response from primary.
+//                }
+//            }
 
             return mapping.remove(nodeId) && finished();
         }
@@ -523,12 +498,12 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
             assert res.returnValue() != null : res;
 
-            if (res.mapping() != null) {
-                if (mapping == null)
-                    initMapping(cctx, res.mapping(), null);
-            }
-            else
-                initAffinityMapping(cctx, null);
+//            if (res.mapping() != null) {
+//                if (mapping == null)
+//                    initMapping(cctx, res.mapping(), null);
+//            }
+//            else
+//                initAffinityMapping(cctx, null);
 
             return finished();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index d5c7a9e..a40cfe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -21,14 +21,19 @@ import java.util.List;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -38,109 +43,282 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
-    /**
-     * @return Mapped node ID.
-     */
-    public abstract UUID nodeId();
+    /** Stable topology flag mask. */
+    private static final int STABLE_TOP_FLAG_MASK = 0x01;
 
-    /**
-     * @param nodeId Node ID.
-     */
-    public abstract void nodeId(UUID nodeId);
+    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+    private static final int TOP_LOCKED_FLAG_MASK = 0x02;
 
-    /**
-     * @return Subject ID.
-     */
-    public abstract UUID subjectId();
+    /** Skip write-through to a persistent storage. */
+    private static final int SKIP_STORE_FLAG_MASK = 0x04;
+
+    /** Keep binary flag. */
+    private static final int KEEP_BINARY_FLAG_MASK = 0x08;
+
+    /** Return value flag. */
+    private static final int RET_VAL_FLAG_MASK = 0x10;
+
+    /** Target node ID. */
+    @GridDirectTransient
+    protected UUID nodeId;
+
+    /** Future version. */
+    protected long futId;
+
+    /** Topology version. */
+    protected AffinityTopologyVersion topVer;
+
+    /** Write synchronization mode. */
+    protected CacheWriteSynchronizationMode syncMode;
+
+    /** Update operation. */
+    protected GridCacheOperation op;
+
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name hash. */
+    protected int taskNameHash;
+
+    /** Compressed boolean flags. */
+    protected byte flags;
+
+    /** */
+    @GridDirectTransient
+    private GridNearAtomicUpdateResponse res;
 
     /**
-     * @return Task name hash.
+     *
      */
-    public abstract int taskNameHash();
+    public GridNearAtomicAbstractUpdateRequest() {
+        // No-op.
+    }
 
     /**
-     * @return Future version.
-     */
-    public abstract long futureId();
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     * @param futId Future ID.
+     * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
+     * @param syncMode Synchronization mode.
+     * @param op Cache update operation.
+     * @param retval Return value required flag.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip write-through to a persistent storage.
+     * @param keepBinary Keep binary flag.
+     * @param addDepInfo Deployment info flag.
+     */
+    protected GridNearAtomicAbstractUpdateRequest(
+        int cacheId,
+        UUID nodeId,
+        long futId,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean topLocked,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean stableTop,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean addDepInfo
+    ) {
+        this.cacheId = cacheId;
+        this.nodeId = nodeId;
+        this.futId = futId;
+        this.topVer = topVer;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.addDepInfo = addDepInfo;
+
+        if (stableTop)
+            stableTopology(true);
+
+        if (topLocked)
+            topologyLocked(true);
+
+        if (retval)
+            returnValue(true);
+
+        if (skipStore)
+            skipStore(true);
+
+        if (keepBinary)
+            keepBinary(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+        return ctx.atomicMessageLogger();
+    }
+
+    boolean stableTopology() {
+        return isFlag(STABLE_TOP_FLAG_MASK);
+    }
+
+    boolean dhtReplyToNear() {
+        return stableTopology() && syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
+    }
+
+    void stableTopology(boolean stableTop) {
+        setFlag(stableTop, STABLE_TOP_FLAG_MASK);
+    }
+
+    public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    public GridCacheOperation operation() {
+        return op;
+    }
+
+    public UUID subjectId() {
+        return subjId;
+    }
+
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public long futureId() {
+        return futId;
+    }
+
+    public final CacheWriteSynchronizationMode writeSynchronizationMode() {
+        return syncMode;
+    }
 
     /**
-     * @return Flag indicating whether this is fast-map udpate.
-     * TODO IGNITE-4705
+     * @param res Response.
+     * @return {@code True} if current response was {@code null}.
      */
-    public abstract boolean fastMap();
+    public boolean onResponse(GridNearAtomicUpdateResponse res) {
+        if (this.res == null) {
+            this.res = res;
+
+            return true;
+        }
+
+        return false;
+    }
 
     /**
-     * @return Update version for fast-map request.
-     * TODO IGNITE-4705
+     * @return Response.
      */
-    public abstract GridCacheVersion updateVersion();
+    @Nullable public GridNearAtomicUpdateResponse response() {
+        return res;
+    }
 
     /**
      * @return Topology locked flag.
      */
-    public abstract boolean topologyLocked();
+    public final boolean topologyLocked() {
+        return isFlag(TOP_LOCKED_FLAG_MASK);
+    }
 
     /**
-     * @return {@code True} if request sent from client node.
+     * Sets topologyLocked flag value.
      */
-    public abstract boolean clientRequest();
+    public final void topologyLocked(boolean val) {
+        setFlag(val, TOP_LOCKED_FLAG_MASK);
+    }
 
     /**
-     * @return Cache write synchronization mode.
+     * @return Return value flag.
      */
-    public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+    public final boolean returnValue() {
+        return isFlag(RET_VAL_FLAG_MASK);
+    }
 
     /**
-     * @return Expiry policy.
+     * Sets returnValue flag value.
      */
-    public abstract ExpiryPolicy expiry();
+    public final void returnValue(boolean val) {
+        setFlag(val, RET_VAL_FLAG_MASK);
+    }
 
     /**
-     * @return Return value flag.
+     * @return Skip write-through to a persistent storage.
      */
-    public abstract boolean returnValue();
+    public final boolean skipStore() {
+        return isFlag(SKIP_STORE_FLAG_MASK);
+    }
 
     /**
-     * @return Filter.
+     * Sets skipStore flag value.
      */
-    @Nullable public abstract CacheEntryPredicate[] filter();
+    public void skipStore(boolean val) {
+        setFlag(val, SKIP_STORE_FLAG_MASK);
+    }
 
     /**
-     * @return Skip write-through to a persistent storage.
+     * @return Keep binary flag.
      */
-    public abstract boolean skipStore();
+    public final boolean keepBinary() {
+        return isFlag(KEEP_BINARY_FLAG_MASK);
+    }
 
     /**
-     * @return Keep binary flag.
+     * Sets keepBinary flag value.
      */
-    public abstract boolean keepBinary();
+    public void keepBinary(boolean val) {
+        setFlag(val, KEEP_BINARY_FLAG_MASK);
+    }
 
     /**
-     * @return Update operation.
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
      */
-    public abstract GridCacheOperation operation();
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
 
     /**
-     * @return Optional arguments for entry processor.
+     * Reads flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
      */
-    @Nullable public abstract Object[] invokeArguments();
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
 
     /**
-     * @return Flag indicating whether this request contains primary keys.
-     * TODO IGNITE-4705
+     * @return Expiry policy.
      */
-    public abstract boolean hasPrimary();
+    public abstract ExpiryPolicy expiry();
 
     /**
-     * @param res Response.
-     * @return {@code True} if current response was {@code null}.
+     * @return Filter.
      */
-    public abstract boolean onResponse(GridNearAtomicUpdateResponse res);
+    @Nullable public abstract CacheEntryPredicate[] filter();
 
     /**
-     * @return Response.
+     * @return Optional arguments for entry processor.
      */
-    @Nullable public abstract GridNearAtomicUpdateResponse response();
+    @Nullable public abstract Object[] invokeArguments();
 
     /**
      * @param key Key to add.
@@ -148,14 +326,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @param conflictTtl Conflict TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
-     * @param primary If given key is primary on this mapping.
      */
-    public abstract void addUpdateEntry(KeyCacheObject key,
+    abstract void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean primary);
+        @Nullable GridCacheVersion conflictVer);
 
     /**
      * @return Keys for this update request.
@@ -185,7 +361,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      */
     public abstract CacheObject writeValue(int idx);
 
-
     /**
      * @return Conflict versions.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 869cfbc..331bcdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -26,7 +26,6 @@ import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -41,10 +40,10 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
@@ -61,56 +60,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Target node ID. */
-    @GridDirectTransient
-    private UUID nodeId;
-
-    /** Future version. */
-    private long futId;
-
-    /** Update version. Set to non-null if fastMap is {@code true}. */
-    private GridCacheVersion updateVer;
-
-    /** Topology version. */
-    private AffinityTopologyVersion topVer;
-
-    /** Write synchronization mode. */
-    private CacheWriteSynchronizationMode syncMode;
-
-    /** Update operation. */
-    private GridCacheOperation op;
-
-    /** Subject ID. */
-    protected UUID subjId;
-
-    /** Task name hash. */
-    protected int taskNameHash;
-
-    /** */
-    @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
-
-    /** Fast map flag. */
-    protected boolean fastMap;
-
-    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
-    protected boolean topLocked;
-
-    /** Flag indicating whether request contains primary keys. */
-    protected boolean hasPrimary;
-
-    /** Skip write-through to a persistent storage. */
-    protected boolean skipStore;
-
-    /** */
-    protected boolean clientReq;
-
-    /** Keep binary flag. */
-    protected boolean keepBinary;
-
-    /** Return value flag. */
-    protected boolean retval;
-
     /** Keys to update. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
@@ -120,10 +69,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     @GridDirectCollection(CacheObject.class)
     private List<CacheObject> vals;
 
-    /** Partitions of keys. */
-    @GridDirectCollection(int.class)
-    private List<Integer> partIds;
-
     /** Entry processors. */
     @GridDirectTransient
     private List<EntryProcessor<Object, Object, Object>> entryProcessors;
@@ -177,7 +122,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
      * @param nodeId Node ID.
      * @param futId Future ID.
      * @param fastMap Fast map scheme flag.
-     * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
      * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
@@ -190,7 +134,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param keepBinary Keep binary flag.
-     * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      * @param maxEntryCnt Maximum entries count.
      */
@@ -198,8 +141,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         int cacheId,
         UUID nodeId,
         long futId,
-        boolean fastMap,
-        @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
         boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
@@ -210,32 +151,29 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean stableTop,
         boolean skipStore,
         boolean keepBinary,
-        boolean clientReq,
         boolean addDepInfo,
         int maxEntryCnt
     ) {
-        this.cacheId = cacheId;
-        this.nodeId = nodeId;
-        this.futId = futId;
-        this.fastMap = fastMap;
-        this.updateVer = updateVer;
-
-        this.topVer = topVer;
-        this.topLocked = topLocked;
-        this.syncMode = syncMode;
-        this.op = op;
-        this.retval = retval;
+        super(cacheId,
+            nodeId,
+            futId,
+            topVer,
+            topLocked,
+            syncMode,
+            op,
+            retval,
+            subjId,
+            taskNameHash,
+            stableTop,
+            skipStore,
+            keepBinary,
+            addDepInfo);
         this.expiryPlc = expiryPlc;
         this.invokeArgs = invokeArgs;
         this.filter = filter;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-        this.skipStore = skipStore;
-        this.keepBinary = keepBinary;
-        this.clientReq = clientReq;
-        this.addDepInfo = addDepInfo;
 
         // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
         // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
@@ -244,84 +182,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         initSize = Math.min(maxEntryCnt, 10);
 
         keys = new ArrayList<>(initSize);
-
-        partIds = new ArrayList<>(initSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID nodeId() {
-        return nodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void nodeId(UUID nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID subjectId() {
-        return subjId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion updateVersion() {
-        return updateVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
-        return syncMode;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheOperation operation() {
-        return op;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
-        if (this.res == null) {
-            this.res = res;
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public GridNearAtomicUpdateResponse response() {
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
-        return ctx.atomicMessageLogger();
     }
 
     /** {@inheritDoc} */
@@ -329,8 +189,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean primary) {
+        @Nullable GridCacheVersion conflictVer) {
         EntryProcessor<Object, Object, Object> entryProcessor = null;
 
         if (op == TRANSFORM) {
@@ -342,7 +201,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         assert val != null || op == DELETE;
 
         keys.add(key);
-        partIds.add(key.partition());
 
         if (entryProcessor != null) {
             if (entryProcessors == null)
@@ -359,8 +217,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
             vals.add((CacheObject)val);
         }
 
-        hasPrimary |= primary;
-
         // In case there is no conflict, do not create the list.
         if (conflictVer != null) {
             if (conflictVers == null) {
@@ -488,41 +344,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
-    @Override public boolean fastMap() {
-        return fastMap;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean topologyLocked() {
-        return topLocked;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean clientRequest() {
-        return clientReq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean returnValue() {
-        return retval;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean skipStore() {
-        return skipStore;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean keepBinary() {
-        return keepBinary;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasPrimary() {
-        return hasPrimary;
-    }
-
-    /** {@inheritDoc} */
     @Override @Nullable public CacheEntryPredicate[] filter() {
         return filter;
     }
@@ -600,397 +421,22 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         }
         else
             finishUnmarshalCacheObjects(vals, cctx, ldr);
-
-        if (partIds != null && !partIds.isEmpty()) {
-            assert partIds.size() == keys.size();
-
-            for (int i = 0; i < keys.size(); i++)
-                keys.get(i).partition(partIds.get(i));
-        }
     }
 
     /** {@inheritDoc} */
     @Override public int partition() {
-        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+        assert !F.isEmpty(keys);
+
+        return keys.get(0).partition();
     }
 
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 3:
-                if (!writer.writeBoolean("clientReq", clientReq))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeMessage("conflictTtls", conflictTtls))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeBoolean("fastMap", fastMap))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeBoolean("hasPrimary", hasPrimary))
-                    return false;
-
-                writer.incrementState();
-
-            case 13:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
-                    return false;
-
-                writer.incrementState();
-
-            case 14:
-                if (!writer.writeBoolean("keepBinary", keepBinary))
-                    return false;
-
-                writer.incrementState();
-
-            case 15:
-                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 16:
-                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 17:
-                if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
-                    return false;
-
-                writer.incrementState();
-
-            case 18:
-                if (!writer.writeBoolean("retval", retval))
-                    return false;
-
-                writer.incrementState();
-
-            case 19:
-                if (!writer.writeBoolean("skipStore", skipStore))
-                    return false;
-
-                writer.incrementState();
-
-            case 20:
-                if (!writer.writeUuid("subjId", subjId))
-                    return false;
-
-                writer.incrementState();
-
-            case 21:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 22:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
-                    return false;
-
-                writer.incrementState();
-
-            case 23:
-                if (!writer.writeBoolean("topLocked", topLocked))
-                    return false;
-
-                writer.incrementState();
-
-            case 24:
-                if (!writer.writeMessage("topVer", topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 25:
-                if (!writer.writeMessage("updateVer", updateVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 26:
-                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
         return true;
     }
 
     /** {@inheritDoc} */
     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 3:
-                clientReq = reader.readBoolean("clientReq");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                conflictTtls = reader.readMessage("conflictTtls");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 8:
-                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                fastMap = reader.readBoolean("fastMap");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 12:
-                hasPrimary = reader.readBoolean("hasPrimary");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 13:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 14:
-                keepBinary = reader.readBoolean("keepBinary");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 15:
-                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 16:
-                byte opOrd;
-
-                opOrd = reader.readByte("op");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                op = GridCacheOperation.fromOrdinal(opOrd);
-
-                reader.incrementState();
-
-            case 17:
-                partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 18:
-                retval = reader.readBoolean("retval");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 19:
-                skipStore = reader.readBoolean("skipStore");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 20:
-                subjId = reader.readUuid("subjId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 21:
-                byte syncModeOrd;
-
-                syncModeOrd = reader.readByte("syncMode");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
-                reader.incrementState();
-
-            case 22:
-                taskNameHash = reader.readInt("taskNameHash");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 23:
-                topLocked = reader.readBoolean("topLocked");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 24:
-                topVer = reader.readMessage("topVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 25:
-                updateVer = reader.readMessage("updateVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 26:
-                vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
         return reader.afterMessageRead(GridNearAtomicFullUpdateRequest.class);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
deleted file mode 100644
index 855fb78..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class GridNearAtomicMappingResponse extends GridCacheMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Message index. */
-    public static final int CACHE_MSG_IDX = nextIndexId();
-
-    /** */
-    private static final int AFF_MAPPING_FLAG_MASK = 0x01;
-
-    /** */
-    private int part;
-
-    /** */
-    @GridDirectCollection(UUID.class)
-    private List<UUID> mapping;
-
-    /** */
-    private long futId;
-
-    /** */
-    private byte flags;
-
-    /**
-     *
-     */
-    public GridNearAtomicMappingResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     * @param part Partition.
-     * @param futId Future ID.
-     * @param mapping Mapping.
-     * @param affMapping {@code True} if update mapping matches affinity function result.
-     */
-    GridNearAtomicMappingResponse(int cacheId, int part, long futId, List<UUID> mapping, boolean affMapping) {
-        assert mapping == null || !affMapping;
-
-        this.cacheId = cacheId;
-        this.part = part;
-        this.futId = futId;
-        this.mapping = mapping;
-
-        if (affMapping)
-            setFlag(true, AFF_MAPPING_FLAG_MASK);
-    }
-
-    /**
-     * @return {@code True} if update mapping matches affinity function result.
-     */
-    boolean affinityMapping() {
-        return isFlag(AFF_MAPPING_FLAG_MASK);
-    }
-
-    /**
-     * Sets flag mask.
-     *
-     * @param flag Set or clear.
-     * @param mask Mask.
-     */
-    protected final void setFlag(boolean flag, int mask) {
-        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
-    }
-
-    /**
-     * Reags flag mask.
-     *
-     * @param mask Mask to read.
-     * @return Flag value.
-     */
-    final boolean isFlag(int mask) {
-        return (flags & mask) != 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partition() {
-        return part;
-    }
-
-    /**
-     * @return Mapping.
-     */
-    public List<UUID> mapping() {
-        return mapping;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public long futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return -47;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 7;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 3:
-                if (!writer.writeByte("flags", flags))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeInt("part", part))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 3:
-                flags = reader.readByte("flags");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                part = reader.readInt("part");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(GridNearAtomicMappingResponse.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridNearAtomicMappingResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index c474c83..b9ded3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -59,8 +59,6 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param futId Future ID.
-     * @param fastMap Fast map scheme flag.
-     * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
      * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
@@ -71,15 +69,12 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param keepBinary Keep binary flag.
-     * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      */
     GridNearAtomicSingleUpdateFilterRequest(
         int cacheId,
         UUID nodeId,
         long futId,
-        boolean fastMap,
-        @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
         boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
@@ -88,17 +83,15 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean stableTop,
         boolean skipStore,
         boolean keepBinary,
-        boolean clientReq,
         boolean addDepInfo
     ) {
         super(
             cacheId,
             nodeId,
             futId,
-            fastMap,
-            updateVer,
             topVer,
             topLocked,
             syncMode,
@@ -106,9 +99,9 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
             retval,
             subjId,
             taskNameHash,
+            stableTop,
             skipStore,
             keepBinary,
-            clientReq,
             addDepInfo
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index b1b951f..a741735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -102,8 +102,21 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         int remapCnt,
         boolean waitTopFut
     ) {
-        super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
-            skipStore, keepBinary, remapCnt, waitTopFut);
+        super(cctx,
+            cache,
+            syncMode,
+            op,
+            invokeArgs,
+            retval,
+            rawRetval,
+            expiryPlc,
+            filter,
+            subjId,
+            taskNameHash,
+            skipStore,
+            keepBinary,
+            remapCnt,
+            waitTopFut);
 
         assert subjId != null;
 
@@ -159,7 +172,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         if (res != null) {
             if (msgLog.isDebugEnabled()) {
                 msgLog.debug("Near update single fut, node left [futId=" + req.futureId() +
-                    ", writeVer=" + req.updateVersion() +
                     ", node=" + nodeId + ']');
             }
 
@@ -184,7 +196,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridCacheReturn ret = (GridCacheReturn)res;
 
         Object retval = res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
-                cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+            cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
 
         if (op == TRANSFORM && retval == null)
             retval = Collections.emptyMap();
@@ -202,30 +214,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
-        GridCacheReturn opRes0;
-        CachePartialUpdateCheckedException err0;
-        AffinityTopologyVersion remapTopVer0;
-
-        synchronized (mux) {
-            if (futId == null || futId != res.futureId())
-                return;
-
-            assert reqState != null;
-
-            if (reqState.onMappingReceived(cctx, res)) {
-                opRes0 = opRes;
-                err0 = err;
-                remapTopVer0 = onAllReceived();
-            }
-            else
-                return;
-        }
-
-        finishUpdateFuture(opRes0, err0, remapTopVer0);
-    }
-
-    /** {@inheritDoc} */
     @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
         GridCacheReturn opRes0;
         CachePartialUpdateCheckedException err0;
@@ -444,7 +432,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
-        if (res.remapTopologyVersion() != null || !req.hasPrimary())
+        if (res.remapTopologyVersion() != null)
             return;
 
         GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -585,14 +573,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
         GridNearAtomicAbstractUpdateRequest req;
 
+        boolean stableTop = true;
+
         if (canUseSingleRequest()) {
             if (op == TRANSFORM) {
                 req = new GridNearAtomicSingleUpdateInvokeRequest(
                     cctx.cacheId(),
                     primary.id(),
                     futId,
-                    false,
-                    null,
                     topVer,
                     topLocked,
                     syncMode,
@@ -601,9 +589,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     invokeArgs,
                     subjId,
                     taskNameHash,
+                    stableTop,
                     skipStore,
                     keepBinary,
-                    cctx.kernalContext().clientNode(),
                     cctx.deploymentEnabled());
             }
             else {
@@ -612,8 +600,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         cctx.cacheId(),
                         primary.id(),
                         futId,
-                        false,
-                        null,
                         topVer,
                         topLocked,
                         syncMode,
@@ -621,9 +607,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         retval,
                         subjId,
                         taskNameHash,
+                        stableTop,
                         skipStore,
                         keepBinary,
-                        cctx.kernalContext().clientNode(),
                         cctx.deploymentEnabled());
                 }
                 else {
@@ -631,8 +617,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         cctx.cacheId(),
                         primary.id(),
                         futId,
-                        false,
-                        null,
                         topVer,
                         topLocked,
                         syncMode,
@@ -641,9 +625,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         filter,
                         subjId,
                         taskNameHash,
+                        stableTop,
                         skipStore,
                         keepBinary,
-                        cctx.kernalContext().clientNode(),
                         cctx.deploymentEnabled());
                 }
             }
@@ -653,8 +637,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 cctx.cacheId(),
                 primary.id(),
                 futId,
-                false,
-                null,
                 topVer,
                 topLocked,
                 syncMode,
@@ -665,9 +647,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 filter,
                 subjId,
                 taskNameHash,
+                stableTop,
                 skipStore,
                 keepBinary,
-                cctx.kernalContext().clientNode(),
                 cctx.deploymentEnabled(),
                 1);
         }
@@ -676,8 +658,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             val,
             CU.TTL_NOT_CHANGED,
             CU.EXPIRE_TIME_CALCULATE,
-            null,
-            true);
+            null);
 
         return req;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 269443f..5199602 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -77,8 +77,6 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param futId Future ID.
-     * @param fastMap Fast map scheme flag.
-     * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
      * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
@@ -89,15 +87,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param keepBinary Keep binary flag.
-     * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      */
     GridNearAtomicSingleUpdateInvokeRequest(
         int cacheId,
         UUID nodeId,
         long futId,
-        boolean fastMap,
-        @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
         boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
@@ -106,17 +101,15 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         @Nullable Object[] invokeArgs,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean stableTop,
         boolean skipStore,
         boolean keepBinary,
-        boolean clientReq,
         boolean addDepInfo
     ) {
         super(
             cacheId,
             nodeId,
             futId,
-            fastMap,
-            updateVer,
             topVer,
             topLocked,
             syncMode,
@@ -124,9 +117,9 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
             retval,
             subjId,
             taskNameHash,
+            stableTop,
             skipStore,
             keepBinary,
-            clientReq,
             addDepInfo
         );
         this.invokeArgs = invokeArgs;
@@ -140,14 +133,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
      * @param conflictTtl Conflict TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
-     * @param primary If given key is primary on this mapping.
      */
     @Override public void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean primary) {
+        @Nullable GridCacheVersion conflictVer) {
         assert conflictTtl < 0 : conflictTtl;
         assert conflictExpireTime < 0 : conflictExpireTime;
         assert conflictVer == null : conflictVer;
@@ -156,9 +147,6 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         entryProcessor = (EntryProcessor<Object, Object, Object>)val;
 
         this.key = key;
-        partId = key.partition();
-
-        hasPrimary(hasPrimary() | primary);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 577b130..42367b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -57,9 +57,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
     /** Value to update. */
     protected CacheObject val;
 
-    /** Partition of key. */
-    protected int partId;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -73,8 +70,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param futId Future ID.
-     * @param fastMap Fast map scheme flag.
-     * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
      * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
@@ -84,15 +79,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param keepBinary Keep binary flag.
-     * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      */
     GridNearAtomicSingleUpdateRequest(
         int cacheId,
         UUID nodeId,
         long futId,
-        boolean fastMap,
-        @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
         boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
@@ -100,17 +92,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean stableTop,
         boolean skipStore,
         boolean keepBinary,
-        boolean clientReq,
         boolean addDepInfo
     ) {
-        super(
-            cacheId,
+        super(cacheId,
             nodeId,
             futId,
-            fastMap,
-            updateVer,
             topVer,
             topLocked,
             syncMode,
@@ -118,16 +107,17 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
             retval,
             subjId,
             taskNameHash,
+            stableTop,
             skipStore,
             keepBinary,
-            clientReq,
-            addDepInfo
-        );
+            addDepInfo);
     }
 
     /** {@inheritDoc} */
     @Override public int partition() {
-        return partId;
+        assert key != null;
+
+        return key.partition();
     }
 
     /**
@@ -136,14 +126,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
      * @param conflictTtl Conflict TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
-     * @param primary If given key is primary on this mapping.
      */
     @Override public void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean primary) {
+        @Nullable GridCacheVersion conflictVer) {
         assert op != TRANSFORM;
         assert val != null || op == DELETE;
         assert conflictTtl < 0 : conflictTtl;
@@ -151,15 +139,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         assert conflictVer == null : conflictVer;
 
         this.key = key;
-        partId = key.partition();
 
         if (val != null) {
             assert val instanceof CacheObject : val;
 
             this.val = (CacheObject)val;
         }
-
-        hasPrimary(hasPrimary() | primary);
     }
 
     /** {@inheritDoc} */
@@ -255,8 +240,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
         if (val != null)
             val.finishUnmarshal(cctx.cacheObjectContext(), ldr);
-
-        key.partition(partId);
     }
 
     /** {@inheritDoc} */
@@ -280,12 +263,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
                 writer.incrementState();
 
-            case 12:
-                if (!writer.writeInt("partId", partId))
-                    return false;
-
-                writer.incrementState();
-
             case 13:
                 if (!writer.writeMessage("val", val))
                     return false;
@@ -316,14 +293,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
                 reader.incrementState();
 
-            case 12:
-                partId = reader.readInt("partId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 13:
                 val = reader.readMessage("val");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 573cb40..629c447 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -233,7 +233,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (msgLog.isDebugEnabled()) {
             msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
-                ", writeVer=" + req.updateVersion() +
                 ", node=" + nodeId + ']');
         }
 
@@ -284,51 +283,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
-        GridCacheReturn opRes0;
-        CachePartialUpdateCheckedException err0;
-        AffinityTopologyVersion remapTopVer0;
-
-        synchronized (mux) {
-            if (futId == null || futId != res.futureId())
-                return;
-
-            PrimaryRequestState reqState;
-
-            if (singleReq != null) {
-                if (singleReq.onMappingReceived(cctx, res)) {
-                    opRes0 = opRes;
-                    err0 = err;
-                    remapTopVer0 = onAllReceived();
-                }
-                else
-                    return;
-            }
-            else {
-                reqState = mappings != null ? mappings.get(nodeId) : null;
-
-                if (reqState != null && reqState.onMappingReceived(cctx, res)) {
-                    assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
-
-                    resCnt++;
-
-                    if (mappings.size() == resCnt) {
-                        opRes0 = opRes;
-                        err0 = err;
-                        remapTopVer0 = onAllReceived();
-                    }
-                    else
-                        return;
-                }
-                else
-                    return;
-            }
-        }
-
-        finishUpdateFuture(opRes0, err0, remapTopVer0);
-    }
-
-    /** {@inheritDoc} */
     @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
         GridCacheReturn opRes0;
         CachePartialUpdateCheckedException err0;
@@ -756,14 +710,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                     if (msgLog.isDebugEnabled()) {
                         msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
-                            ", writeVer=" + req.updateVersion() +
                             ", node=" + req.nodeId() + ']');
                     }
                 }
                 catch (IgniteCheckedException e) {
                     if (msgLog.isDebugEnabled()) {
                         msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
-                            ", writeVer=" + req.updateVersion() +
                             ", node=" + req.nodeId() +
                             ", err=" + e + ']');
                     }
@@ -817,7 +769,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             if (size == 1) {
                 assert remapKeys == null || remapKeys.size() == 1;
 
-                singleReq0 = mapSingleUpdate(topVer, futId, null);
+                singleReq0 = mapSingleUpdate(topVer, futId);
             }
             else {
                 Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
@@ -923,6 +875,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
 
+        boolean stableTop = true;
+
         // Create mappings first, then send messages.
         for (Object key : keys) {
             if (key == null)
@@ -991,8 +945,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     cctx.cacheId(),
                     nodeId,
                     futId,
-                    false,
-                    null,
                     topVer,
                     topLocked,
                     syncMode,
@@ -1003,16 +955,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     filter,
                     subjId,
                     taskNameHash,
+                    stableTop,
                     skipStore,
                     keepBinary,
-                    cctx.kernalContext().clientNode(),
                     cctx.deploymentEnabled(),
                     keys.size()));
 
                 pendingMappings.put(nodeId, mapped);
             }
 
-            mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, true);
+            mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
         }
 
         return pendingMappings;
@@ -1021,13 +973,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     /**
      * @param topVer Topology version.
      * @param futId Future ID.
-     * @param updVer Update version.
      * @return Request.
      * @throws Exception If failed.
      */
-    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer,
-        Long futId,
-        @Nullable GridCacheVersion updVer) throws Exception {
+    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId) throws Exception {
         Object key = F.first(keys);
 
         Object val;
@@ -1086,12 +1035,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                 "left the grid).");
 
+        boolean stableTop = true;
+
         GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
             cctx.cacheId(),
             primary.id(),
             futId,
-            false,
-            updVer,
             topVer,
             topLocked,
             syncMode,
@@ -1102,9 +1051,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             filter,
             subjId,
             taskNameHash,
+            stableTop,
             skipStore,
             keepBinary,
-            cctx.kernalContext().clientNode(),
             cctx.deploymentEnabled(),
             1);
 
@@ -1112,8 +1061,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             val,
             conflictTtl,
             conflictExpireTime,
-            conflictVer,
-            true);
+            conflictVer);
 
         return new PrimaryRequestState(req);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 76f2ea0..6bb8913 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -104,10 +104,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Near expire times. */
     private GridLongList nearExpireTimes;
 
-    /** */
-    @GridDirectCollection(UUID.class)
-    private List<UUID> mapping;
-
     /** Partition ID. */
     private int partId = -1;
 
@@ -131,20 +127,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
         this.addDepInfo = addDepInfo;
     }
 
-    /**
-     * @return Update mapping.
-     */
-    public List<UUID> mapping() {
-        return mapping;
-    }
-
-    /**
-     * @param mapping Mapping.
-     */
-    public void mapping(List<UUID> mapping) {
-        this.mapping = mapping;
-    }
-
     /** {@inheritDoc} */
     @Override public int lookupIndex() {
         return CACHE_MSG_IDX;
@@ -499,12 +481,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 writer.incrementState();
 
-            case 6:
-                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
-                    return false;
-
-                writer.incrementState();
-
             case 7:
                 if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
                     return false;
@@ -599,14 +575,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 6:
-                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 7:
                 nearExpireTimes = reader.readMessage("nearExpireTimes");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index f8ae661..62aecd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -142,10 +142,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         List<Integer> nearValsIdxs = res.nearValuesIndexes();
         List<Integer> skipped = res.skippedIndexes();
 
-        GridCacheVersion ver = req.updateVersion();
-
-        if (ver == null)
-            ver = res.nearVersion();
+        GridCacheVersion ver = res.nearVersion();
 
         assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
 
@@ -195,7 +192,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 processNearAtomicUpdateResponse(ver,
                     key,
                     val,
-                    null,
                     ttl,
                     expireTime,
                     req.keepBinary(),
@@ -213,7 +209,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
      * @param ver Version.
      * @param key Key.
      * @param val Value.
-     * @param valBytes Value bytes.
      * @param ttl TTL.
      * @param expireTime Expire time.
      * @param nodeId Node ID.
@@ -225,7 +220,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         GridCacheVersion ver,
         KeyCacheObject key,
         @Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         long ttl,
         long expireTime,
         boolean keepBinary,
@@ -242,7 +236,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 try {
                     entry = entryEx(key, topVer);
 
-                    GridCacheOperation op = (val != null || valBytes != null) ? UPDATE : DELETE;
+                    GridCacheOperation op = val != null ? UPDATE : DELETE;
 
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index cbb2b8a..081e49f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -402,9 +402,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         assertEquals(3, msgs.size());
 
-        for (Object msg : msgs)
-            assertTrue(((GridNearAtomicFullUpdateRequest)msg).clientRequest());
-
         map.put(primaryKey(ignite0.cache(null)), 3);
         map.put(primaryKey(ignite1.cache(null)), 4);
         map.put(primaryKey(ignite2.cache(null)), 5);


[2/2] ignite git commit: ignite-4705

Posted by sb...@apache.org.
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30bfae0e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30bfae0e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30bfae0e

Branch: refs/heads/ignite-4705-2
Commit: 30bfae0e6d5e8ae5098771d6754e4fa24becbd13
Parents: 19c340c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 14:47:21 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 18:34:19 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   8 +-
 .../processors/cache/GridCacheIoManager.java    |   4 +-
 .../processors/cache/GridCacheMvccManager.java  |   2 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      | 219 +++----
 .../GridDhtAtomicAbstractUpdateRequest.java     |  54 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 269 ++-------
 .../dht/atomic/GridDhtAtomicNearResponse.java   | 115 +---
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  16 +-
 .../GridDhtAtomicSingleUpdateRequest.java       |   4 -
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  14 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |   4 -
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |  35 +-
 ...idNearAtomicAbstractSingleUpdateRequest.java | 475 +--------------
 .../GridNearAtomicAbstractUpdateFuture.java     | 107 ++--
 .../GridNearAtomicAbstractUpdateRequest.java    | 285 +++++++--
 .../atomic/GridNearAtomicFullUpdateRequest.java | 594 +------------------
 .../atomic/GridNearAtomicMappingResponse.java   | 244 --------
 ...GridNearAtomicSingleUpdateFilterRequest.java |  11 +-
 .../GridNearAtomicSingleUpdateFuture.java       |  67 +--
 ...GridNearAtomicSingleUpdateInvokeRequest.java |  18 +-
 .../GridNearAtomicSingleUpdateRequest.java      |  47 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  72 +--
 .../atomic/GridNearAtomicUpdateResponse.java    |  32 -
 .../distributed/near/GridNearAtomicCache.java   |  10 +-
 ...niteCacheClientNodeChangingTopologyTest.java |   3 -
 25 files changed, 568 insertions(+), 2141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 908d1d6..fa59291 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -72,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -121,11 +120,11 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
 import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -176,11 +175,6 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
-            case -47:
-                msg = new GridNearAtomicMappingResponse();
-
-                break;
-
             case -46:
                 msg = new UpdateErrors();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index c46b01a..d1a6753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -494,9 +494,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @return Atomic future ID if applicable for message.
      */
     @Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
-        if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
-            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion();
-        else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+        if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
             return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
 
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index f3d3fb6..7b87769 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -136,7 +136,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private volatile boolean stopping;
 
     /** */
-    private final AtomicLong atomicFutId = new AtomicLong();
+    private final AtomicLong atomicFutId = new AtomicLong(U.currentTimeMillis());
 
     /** Lock callback. */
     @GridToStringExclude

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 094d643..96f3611 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -52,7 +51,6 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  * DHT atomic cache backup update future.
@@ -78,7 +76,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     protected final GridCacheContext cctx;
 
     /** Future version. */
-    protected final long futId;
+    protected final Long futId;
 
     /** Update request. */
     final GridNearAtomicAbstractUpdateRequest updateReq;
@@ -94,25 +92,31 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     private volatile int resCnt;
 
     /** */
-    private boolean repliedToNear;
+    private final GridNearAtomicUpdateResponse updateRes;
 
     /** */
-    private boolean affMapping = true;
+    private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
 
     /**
      * @param cctx Cache context.
      * @param writeVer Write version.
      * @param updateReq Update request.
+     * @param updateRes Response.
+     * @param completionCb Callback to invoke to send response to near node.
      */
     protected GridDhtAtomicAbstractUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
         this.cctx = cctx;
 
         this.updateReq = updateReq;
         this.writeVer = writeVer;
+        this.updateRes = updateRes;
+        this.completionCb = completionCb;
 
         futId = cctx.mvcc().atomicFutureId();
 
@@ -145,7 +149,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /**
-     * @param nearNodeId Near node ID.
      * @param entry Entry to map.
      * @param val Value to write.
      * @param entryProcessor Entry processor.
@@ -159,7 +162,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     @SuppressWarnings("ForLoopReplaceableByForEach")
     final void addWriteEntry(
         AffinityAssignment affAssignment,
-        UUID nearNodeId,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
@@ -175,9 +177,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
         List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), affAssignment, affNodes);
 
-        if (dhtNodes != null)
-            affMapping = false;
-        else
+        if (dhtNodes == null)
             dhtNodes = affNodes;
 
         if (log.isDebugEnabled())
@@ -198,7 +198,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                 if (updateReq == null) {
                     updateReq = createRequest(
                         node.id(),
-                        nearNodeId,
                         futId,
                         writeVer,
                         syncMode,
@@ -236,7 +235,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     protected abstract void addNearKey(KeyCacheObject key, Collection<UUID> readers);
 
     /**
-     * @param nearNodeId Near node ID.
      * @param readers Entry readers.
      * @param entry Entry.
      * @param val Value.
@@ -245,15 +243,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param expireTime Expire time for near cache update (optional).
      */
     final void addNearWriteEntries(
-        UUID nearNodeId,
         Collection<UUID> readers,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime) {
-        affMapping = false;
-
         CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
 
         addNearKey(entry.key(), readers);
@@ -272,7 +267,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
                 updateReq = createRequest(
                     node.id(),
-                    nearNodeId,
                     futId,
                     writeVer,
                     syncMode,
@@ -332,14 +326,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
         GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
 
-        boolean needReplyToNear = false;
+//        boolean needReplyToNear = false;
 
         if (req != null) {
             synchronized (this) {
                 if (req.onResponse()) {
-                    if (errors != null || (nodeErr && !repliedToNear))
-                        needReplyToNear = repliedToNear = true;
-
                     resCnt0 = resCnt;
 
                     resCnt0 += 1;
@@ -349,53 +340,50 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                 else
                     return false;
             }
-
-            if (needReplyToNear) {
-                assert !F.isEmpty(mappings);
-
-                List<UUID> dhtNodes = new ArrayList<>(mappings.size());
-
-                dhtNodes.addAll(mappings.keySet());
-
-                GridDhtAtomicNearResponse res = new GridDhtAtomicNearResponse(cctx.cacheId(),
-                    req.partition(),
-                    req.nearFutureId(),
-                    cctx.localNodeId(),
-                    dhtNodes,
-                    req.flags());
-
-                res.errors(errors);
-
-                res.failedNodeId(nodeId);
-
-                try {
-                    cctx.io().send(req.nearNodeId(), res, cctx.ioPolicy());
-
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, sent response on DHT node fail " +
-                            "[futId=" + futId +
-                            ", writeVer=" + writeVer +
-                            ", node=" + req.nearNodeId() +
-                            ", failedNode=" + nodeId + ']');
-                    }
-                }
-                catch (ClusterTopologyCheckedException ignored) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, failed to notify near node on DHT node fail, near node left " +
-                            "[futId=" + futId +
-                            ", writeVer=" + writeVer +
-                            ", node=" + req.nearNodeId() +
-                            ", failedNode=" + nodeId + ']');
-                    }
-                }
-                catch (IgniteCheckedException ignored) {
-                    U.error(msgLog, "DTH update fut, failed to notify near node on DHT node fail " +
-                        "[futId=" + futId +
-                        ", writeVer=" + writeVer +
-                        ", node=" + req.nearNodeId() +
-                        ", failedNode=" + nodeId + ']');
-                }
-            }
+//
+//            if (needReplyToNear) {
+//                assert !F.isEmpty(mappings);
+//
+//                List<UUID> dhtNodes = new ArrayList<>(mappings.size());
+//
+//                dhtNodes.addAll(mappings.keySet());
+//
+//                GridDhtAtomicNearResponse res = new GridDhtAtomicNearResponse(cctx.cacheId(),
+//                    req.partition(),
+//                    req.nearFutureId(),
+//                    cctx.localNodeId(),
+//                    req.flags());
+//
+//                res.errors(errors);
+//
+//                try {
+//                    cctx.io().send(req.nearNodeId(), res, cctx.ioPolicy());
+//
+//                    if (msgLog.isDebugEnabled()) {
+//                        msgLog.debug("DTH update fut, sent response on DHT node fail " +
+//                            "[futId=" + futId +
+//                            ", writeVer=" + writeVer +
+//                            ", node=" + req.nearNodeId() +
+//                            ", failedNode=" + nodeId + ']');
+//                    }
+//                }
+//                catch (ClusterTopologyCheckedException ignored) {
+//                    if (msgLog.isDebugEnabled()) {
+//                        msgLog.debug("DTH update fut, failed to notify near node on DHT node fail, near node left " +
+//                            "[futId=" + futId +
+//                            ", writeVer=" + writeVer +
+//                            ", node=" + req.nearNodeId() +
+//                            ", failedNode=" + nodeId + ']');
+//                    }
+//                }
+//                catch (IgniteCheckedException ignored) {
+//                    U.error(msgLog, "DTH update fut, failed to notify near node on DHT node fail " +
+//                        "[futId=" + futId +
+//                        ", writeVer=" + writeVer +
+//                        ", node=" + req.nearNodeId() +
+//                        ", failedNode=" + nodeId + ']');
+//                }
+//            }
 
             if (resCnt0 == mappings.size())
                 onDone();
@@ -406,97 +394,49 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         return false;
     }
 
-    final void init(GridNearAtomicUpdateResponse updateRes, GridCacheReturn ret) {
-        repliedToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
-            ret.hasValue() ||
-            updateRes.nearVersion() != null ||
-            updateRes.nodeId().equals(cctx.localNodeId());
-    }
-
     /**
      * Sends requests to remote nodes.
      *
-     * @param updateRes Response.
-     * @param completionCb Callback to invoke to send response to near node.
      * @param ret Cache operation return value.
      */
-    final void map(GridNearAtomicUpdateResponse updateRes,
-        GridDhtAtomicCache.UpdateReplyClosure completionCb,
-        GridCacheReturn ret) {
-        boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
-
-        boolean needReplyToNear = repliedToNear;
+    final void map(ClusterNode nearNode, GridCacheReturn ret) {
+        if (F.isEmpty(mappings)) {
+            completionCb.apply(updateReq, updateRes);
 
-        List<UUID> dhtNodes = null;
+            onDone();
 
-        boolean affMapping = false;
+            return;
+        }
 
-        if (fullSync) {
-            if (!F.isEmpty(mappings)) {
-                if (updateReq.size() == 1 && this.affMapping)
-                    affMapping = true;
-                else {
-                    dhtNodes = new ArrayList<>(mappings.size());
+        boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
 
-                    dhtNodes.addAll(mappings.keySet());
-                }
-            }
-            else
-                dhtNodes = Collections.emptyList();
-        }
-        else
-            dhtNodes = Collections.emptyList();
+        if (updateReq.dhtReplyToNear()) {
+            assert fullSync;
 
-        updateRes.mapping(dhtNodes);
+            boolean needReplyToNear = ret.hasValue() || updateRes.nearVersion() != null;
 
-        if (!F.isEmpty(mappings)) {
-            sendDhtRequests(fullSync && !needReplyToNear, dhtNodes, affMapping, ret);
+            sendDhtRequests(true, nearNode, ret);
 
             if (needReplyToNear)
                 completionCb.apply(updateReq, updateRes);
-            else {
-                if (fullSync && GridDhtAtomicCache.IGNITE_ATOMIC_SND_MAPPING_TO_NEAR) {
-                    GridNearAtomicMappingResponse mappingRes = new GridNearAtomicMappingResponse(
-                        cctx.cacheId(),
-                        updateReq.partition(),
-                        updateReq.futureId(),
-                        dhtNodes,
-                        affMapping);
-
-                    try {
-                        cctx.io().send(updateRes.nodeId(), mappingRes, cctx.ioPolicy());
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(msgLog, "Failed to send mapping response [futId=" + futId +
-                            ", writeVer=" + writeVer +
-                            ", node=" + updateRes.nodeId() + ']');
-                    }
-                }
-            }
         }
         else {
-            completionCb.apply(updateReq, updateRes);
+            sendDhtRequests(false, nearNode, ret);
 
-            onDone();
+            if (!fullSync)
+                completionCb.apply(updateReq, updateRes);
         }
     }
 
     /**
      * @param nearReplyInfo {@code True} if need add information for near node response.
-     * @param dhtNodes DHT nodes.
      * @param ret Return value.
      */
-    private void sendDhtRequests(boolean nearReplyInfo, List<UUID> dhtNodes, boolean affMapping, GridCacheReturn ret) {
+    private void sendDhtRequests(boolean nearReplyInfo, ClusterNode nearNode, GridCacheReturn ret) {
         for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
             try {
                 if (nearReplyInfo) {
-                    if (affMapping) {
-                        assert dhtNodes == null;
-
-                        req.affinityMapping(true);
-                    }
-                    else
-                        req.dhtNodes(dhtNodes);
+                    req.nearReplyInfo(nearNode.id(), updateReq.futureId());
 
                     if (!ret.hasValue())
                         req.setResult(ret.success());
@@ -549,8 +489,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /**
+     * @param updateRes Response.
+     * @param err Error.
+     */
+    protected abstract void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err);
+
+    /**
      * @param nodeId Node ID.
-     * @param nearNodeId Near node ID.
      * @param futId Future ID.
      * @param writeVer Update version.
      * @param syncMode Write synchronization mode.
@@ -562,7 +507,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      */
     protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
         UUID nodeId,
-        UUID nearNodeId,
         long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
@@ -584,6 +528,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     clsr.apply(suc);
             }
 
+            if (updateReq.writeSynchronizationMode() == FULL_SYNC && !updateReq.dhtReplyToNear()) {
+                if (!suc)
+                    addFailedKeys(updateRes, err);
+
+                completionCb.apply(updateReq, updateRes);
+            }
+
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 896c163..e04e381 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -58,12 +58,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /** */
     static final int DHT_ATOMIC_RESULT_SUCCESS_MASK = 0x10;
 
-    /** */
-    static final int DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE = 0x20;
-
-    /** */
-    static final int DHT_ATOMIC_AFF_MAPPING_FLAG_MASK = 0x40;
-
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
@@ -102,10 +96,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /** Additional flags. */
     protected byte flags;
 
-    /** */
-    @GridDirectCollection(UUID.class)
-    private List<UUID> dhtNodes;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -118,14 +108,10 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param nearNodeId Near node ID.
-     * @param nearFutId Future ID on near node.
      */
     protected GridDhtAtomicAbstractUpdateRequest(int cacheId,
         UUID nodeId,
         long futId,
-        UUID nearNodeId,
-        long nearFutId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -138,8 +124,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futId = futId;
-        this.nearNodeId = nearNodeId;
-        this.nearFutId = nearFutId;
         this.writeVer = writeVer;
         this.syncMode = syncMode;
         this.topVer = topVer;
@@ -153,11 +137,11 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
             setFlag(true, DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
     }
 
-    /**
-     * @param affMapping
-     */
-    public void affinityMapping(boolean affMapping) {
-        setFlag(affMapping, DHT_ATOMIC_AFF_MAPPING_FLAG_MASK);
+    void nearReplyInfo(UUID nearNodeId, long nearFutId) {
+        assert nearNodeId != null;
+
+        this.nearNodeId = nearNodeId;
+        this.nearFutId = nearFutId;
     }
 
     /**
@@ -176,20 +160,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         return nearNodeId;
     }
 
-    /**
-     * @param dhtNodes DHT nodes.
-     */
-    void dhtNodes(List<UUID> dhtNodes) {
-        this.dhtNodes = dhtNodes;
-    }
-
-    /**
-     * @return DHT nodes.
-     */
-    List<UUID> dhtNodes() {
-        return dhtNodes;
-    }
-
     /** {@inheritDoc} */
     @Override public int lookupIndex() {
         return CACHE_MSG_IDX;
@@ -478,12 +448,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         }
 
         switch (writer.state()) {
-            case 3:
-                if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID))
-                    return false;
-
-                writer.incrementState();
-
             case 4:
                 if (!writer.writeByte("flags", flags))
                     return false;
@@ -554,14 +518,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
             return false;
 
         switch (reader.state()) {
-            case 3:
-                dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 4:
                 flags = reader.readByte("flags");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index a58f1ca..f5d06dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -34,7 +34,6 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
@@ -93,14 +92,12 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -138,14 +135,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
 
     /** */
-    static final boolean IGNITE_ATOMIC_SND_MAPPING_TO_NEAR =
-        IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_SND_MAPPING_TO_NEAR", false);
-
-    /** */
-    private static final boolean IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK =
-        IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK", true);
-
-    /** */
     private final ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>> defRes =
         new ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>>() {
             @Override protected Map<UUID, GridDhtAtomicDeferredUpdateResponse> initialValue() {
@@ -244,12 +233,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        // TODO IGNITE-4705.
-        log.info("Atomic cache start [name=" + name() +
-            ", mode=" + configuration().getWriteSynchronizationMode() +
-            ", IGNITE_ATOMIC_SND_MAPPING_TO_NEAR=" + IGNITE_ATOMIC_SND_MAPPING_TO_NEAR +
-            ", IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK=" + IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK + ']');
-
         CacheMetricsImpl m = new CacheMetricsImpl(ctx);
 
         if (ctx.dht().near() != null)
@@ -395,19 +378,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(),
-            GridNearAtomicMappingResponse.class,
-            new CI2<UUID, GridNearAtomicMappingResponse>() {
-                @Override public void apply(UUID uuid, GridNearAtomicMappingResponse msg) {
-                    processDhtAtomicNearMappingResponse(uuid, msg);
-                }
-
-                @Override public String toString() {
-                    return "GridNearAtomicMappingResponse handler " +
-                        "[msgIdx=" + GridNearAtomicMappingResponse.CACHE_MSG_IDX + ']';
-                }
-            });
-
         if (near == null) {
             ctx.io().addHandler(
                 ctx.cacheId(),
@@ -1813,7 +1783,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
 
-                        dhtFut = createDhtFuture(ver, req);
+                        dhtFut = createDhtFuture(ver, req, res, completionCb);
 
                         expiry = expiryPolicy(req.expiry());
 
@@ -1867,11 +1837,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         res.returnValue(retVal);
 
-                        if (dhtFut != null) {
-                            dhtFut.init(res, res.returnValue());
-
+                        if (dhtFut != null)
                             ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
-                        }
                     }
                     else {
                         // Should remap all keys.
@@ -1904,8 +1871,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         catch (GridDhtInvalidPartitionException ignore) {
-            assert !req.fastMap() || req.clientRequest() : req;
-
             if (log.isDebugEnabled())
                 log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
 
@@ -1936,7 +1901,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         else {
             // If there are backups, map backup update future.
             if (dhtFut != null) {
-                dhtFut.map(res, completionCb, res.returnValue());
+                dhtFut.map(node, res.returnValue());
                 // Otherwise, complete the call.
             }
             else
@@ -2469,7 +2434,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     dhtFut);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                    dhtFut = createDhtFuture(ver, req);
+                    dhtFut = createDhtFuture(ver, req, null, null); // TODO IGNITE-4705.
 
                     readersOnly = true;
                 }
@@ -2488,7 +2453,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (!readersOnly) {
                             dhtFut.addWriteEntry(
                                 affAssignment,
-                                nearNode.id(),
                                 entry,
                                 updRes.newValue(),
                                 entryProcessor,
@@ -2502,7 +2466,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(
-                                nearNode.id(),
                                 filteredReaders,
                                 entry,
                                 updRes.newValue(),
@@ -2644,17 +2607,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             GridCacheOperation op;
 
             if (putMap != null) {
-                // If fast mapping, filter primary keys for write to store.
-                Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
-                    F.view(putMap, new P1<CacheObject>() {
-                        @Override public boolean apply(CacheObject key) {
-                            return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
-                        }
-                    }) :
-                    putMap;
-
                 try {
-                    ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
+                    ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
                         @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
                             return F.t(v, ver);
                         }
@@ -2667,17 +2621,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 op = UPDATE;
             }
             else {
-                // If fast mapping, filter primary keys for write to store.
-                Collection<KeyCacheObject> storeKeys = req.fastMap() ?
-                    F.view(rmvKeys, new P1<Object>() {
-                        @Override public boolean apply(Object key) {
-                            return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
-                        }
-                    }) :
-                    rmvKeys;
-
                 try {
-                    ctx.store().removeAll(null, storeKeys);
+                    ctx.store().removeAll(null, rmvKeys);
                 }
                 catch (CacheStorePartialUpdateException e) {
                     storeErr = e;
@@ -2712,10 +2657,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     assert writeVal != null || op == DELETE : "null write value found.";
 
-                    boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(),
-                        entry.partition(),
-                        req.topologyVersion());
-
                     Collection<UUID> readers = null;
                     Collection<UUID> filteredReaders = null;
 
@@ -2738,11 +2679,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         expiry,
                         /*event*/true,
                         /*metrics*/true,
-                        primary,
+                        /*primary*/true,
                         /*verCheck*/false,
                         topVer,
                         null,
-                        replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+                        replicate ? DR_PRIMARY : DR_NONE,
                         CU.TTL_NOT_CHANGED,
                         CU.EXPIRE_TIME_CALCULATE,
                         null,
@@ -2777,7 +2718,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     batchRes.addDeleted(entry, updRes, entries);
 
                     if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                        dhtFut = createDhtFuture(ver, req);
+                        dhtFut = createDhtFuture(ver, req, null, null); // TODO IGNITE-4705.
 
                         batchRes.readersOnly(true);
                     }
@@ -2789,7 +2730,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (!batchRes.readersOnly()) {
                             dhtFut.addWriteEntry(
                                 affAssignment,
-                                nearNode.id(),
                                 entry,
                                 writeVal,
                                 entryProcessor,
@@ -2803,7 +2743,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(
-                                nearNode.id(),
                                 filteredReaders,
                                 entry,
                                 writeVal,
@@ -2813,30 +2752,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (hasNear) {
-                        if (primary) {
-                            if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
-                                int idx = firstEntryIdx + i;
-
-                                if (req.operation() == TRANSFORM) {
-                                    res.addNearValue(idx,
-                                        writeVal,
-                                        updRes.newTtl(),
-                                        CU.EXPIRE_TIME_CALCULATE);
-                                }
-                                else
-                                    res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
-
-                                if (writeVal != null || entry.hasValue()) {
-                                    IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+                        if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
+                            int idx = firstEntryIdx + i;
 
-                                    assert f == null : f;
-                                }
+                            if (req.operation() == TRANSFORM) {
+                                res.addNearValue(idx,
+                                    writeVal,
+                                    updRes.newTtl(),
+                                    CU.EXPIRE_TIME_CALCULATE);
                             }
-                            else if (readers.contains(nearNode.id())) // Reader became primary or backup.
-                                entry.removeReader(nearNode.id(), req.messageId());
                             else
-                                res.addSkippedIndex(firstEntryIdx + i);
+                                res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+
+                            if (writeVal != null || entry.hasValue()) {
+                                IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+
+                                assert f == null : f;
+                            }
                         }
+                        else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+                            entry.removeReader(nearNode.id(), req.messageId());
                         else
                             res.addSkippedIndex(firstEntryIdx + i);
                     }
@@ -3083,12 +3018,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @Nullable private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
         if (updateReq.size() == 1)
-            return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
+            return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
         else
-            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
     }
 
     /**
@@ -3098,7 +3035,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (msgLog.isDebugEnabled()) {
             msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
-                ", writeVer=" + req.updateVersion() +
                 ", node=" + nodeId + ']');
         }
 
@@ -3143,13 +3079,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         GridCacheVersion ver = req.writeVersion();
 
-        GridDhtAtomicNearResponse nearRes = ctx.config().getWriteSynchronizationMode() == FULL_SYNC ?
-            new GridDhtAtomicNearResponse(ctx.cacheId(),
+        GridDhtAtomicUpdateResponse dhtRes = null;
+        GridDhtAtomicNearResponse nearRes = null;
+
+        if (req.nearNodeId() != null) {
+            nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
                 req.partition(),
                 req.nearFutureId(),
                 nodeId,
-                req.dhtNodes(),
-                req.flags()) : null;
+                req.flags());
+        }
+        else if (req.writeSynchronizationMode() == FULL_SYNC) {
+            dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+                req.partition(),
+                req.futureId(),
+                ctx.deploymentEnabled());
+        }
 
         boolean replicate = ctx.isDrEnabled();
 
@@ -3233,43 +3178,45 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 // Ignore.
             }
             catch (IgniteCheckedException e) {
+                IgniteCheckedException err =
+                    new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+
                 if (nearRes != null)
-                    nearRes.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+                    nearRes.addFailedKey(key, err);
+                else if (dhtRes != null)
+                    dhtRes.addFailedKey(key, err);
 
                 U.error(log, "Failed to update key on backup node: " + key, e);
             }
         }
 
-        GridDhtAtomicUpdateResponse dhtRes = null;
-
         if (isNearEnabled(cacheCfg)) {
             List<KeyCacheObject> nearEvicted =
                 ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
 
             if (nearEvicted != null) {
-                dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
-                    req.partition(),
-                    req.futureId(),
-                    ctx.deploymentEnabled());
+                if (dhtRes == null) {
+                    dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+                        req.partition(),
+                        req.futureId(),
+                        ctx.deploymentEnabled());
+                }
 
                 dhtRes.nearEvicted(nearEvicted);
             }
         }
 
         if (nearRes != null) {
-            if (IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK)
-                sendDhtNearResponse(nodeId, req, nearRes);
-            else {
-                sendDhtNearResponse(null, req, nearRes);
+            sendDhtNearResponse(req, nearRes);
 
+            sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
+        }
+        else {
+            if (dhtRes != null)
+                sendDhtPrimaryResponse(nodeId, req, dhtRes);
+            else
                 sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
-            }
         }
-        else
-            sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
-
-        if (dhtRes != null)
-            sendDhtPrimaryResponse(nodeId, req, dhtRes);
     }
 
     /**
@@ -3306,44 +3253,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      *
      */
-    private class DeferredResponseClosure implements IgniteInClosure<IgniteException>, Runnable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final int part;
-
-        /** */
-        private final UUID primaryId;
-
-        /** */
-        private final long futId;
-
-        /**
-         * @param part Partition ID.
-         * @param primaryId Primary ID.
-         * @param futId Future ID.
-         */
-        DeferredResponseClosure(int part, UUID primaryId, long futId) {
-            this.part = part;
-            this.primaryId = primaryId;
-            this.futId = futId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() {
-            sendDeferredUpdateResponse(part, primaryId, futId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void apply(IgniteException e) {
-            ctx.kernalContext().getStripedExecutorService().execute(part, this);
-        }
-    }
-
-    /**
-     *
-     */
     private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable {
         /** */
         private final int part;
@@ -3483,33 +3392,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
-     * @param primaryId Primary node ID.
      * @param req Request.
      * @param nearRes Response to send.
      */
-    private void sendDhtNearResponse(final UUID primaryId,
-        final GridDhtAtomicAbstractUpdateRequest req,
-        GridDhtAtomicNearResponse nearRes) {
-        DeferredResponseClosure c = primaryId != null ?
-            new DeferredResponseClosure(req.partition(), primaryId, req.futureId()) : null;
-
+    private void sendDhtNearResponse(final GridDhtAtomicAbstractUpdateRequest req, GridDhtAtomicNearResponse nearRes) {
         try {
-            ClusterNode node = ctx.discovery().node(req.nearNodeId());
-
-            if (node == null)
-                throw new ClusterTopologyCheckedException("Node left: " + req.nearNodeId());
-
-            if (c != null) {
-                ctx.gridIO().send(node,
-                    TOPIC_CACHE,
-                    nearRes,
-                    ctx.ioPolicy(),
-                    c);
-
-                c = null;
-            }
-            else
-                ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy());
+            ctx.gridIO().send(req.nearNodeId(), TOPIC_CACHE, nearRes, ctx.ioPolicy());
 
             if (msgLog.isDebugEnabled()) {
                 msgLog.debug("Sent DHT near response [futId=" + req.futureId() +
@@ -3531,35 +3419,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 ", node=" + req.nearNodeId() +
                 ", res=" + nearRes + ']', e);
         }
-        finally {
-            if (c != null)
-                c.apply(null);
-        }
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param res Response.
-     */
-    private void processDhtAtomicNearMappingResponse(UUID nodeId, GridNearAtomicMappingResponse res) {
-        GridNearAtomicAbstractUpdateFuture updateFut =
-            (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
-
-        if (updateFut != null) {
-            if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Received near mapping response [futId=" + res.futureId() +
-                    ", node=" + nodeId + ']');
-            }
-
-            updateFut.onMappingReceived(nodeId, res);
-        }
-        else {
-            if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Failed to find future for near mapping response [futId=" + res.futureId() +
-                    ", node=" + nodeId +
-                    ", res=" + res + ']');
-            }
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index b397f0f..595e41a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -18,25 +18,19 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_AFF_MAPPING_FLAG_MASK;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_RESULT_SUCCESS_MASK;
 
 /**
@@ -59,19 +53,11 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
     private UUID primaryId;
 
     /** */
-    @GridDirectCollection(UUID.class)
-    @GridToStringInclude
-    private List<UUID> mapping;
-
-    /** */
     @GridToStringExclude
     private byte flags;
 
     /** */
-    private UpdateErrors errors;
-
-    /** */
-    private UUID failedNodeId;
+    private UpdateErrors errs;
 
     /**
      *
@@ -85,14 +71,12 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
      * @param partId Partition.
      * @param futId Future ID.
      * @param primaryId Primary node ID.
-     * @param mapping Update mapping.
      * @param flags Flags.
      */
     public GridDhtAtomicNearResponse(int cacheId,
         int partId,
         long futId,
         UUID primaryId,
-        List<UUID> mapping,
         byte flags)
     {
         assert primaryId != null;
@@ -101,54 +85,21 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
         this.partId = partId;
         this.futId = futId;
         this.primaryId = primaryId;
-        this.mapping = mapping;
         this.flags = flags;
     }
 
     /**
-     * @return {@code True} if update mapping matches affinity function result.
-     */
-    boolean affinityMapping() {
-        return isFlag(DHT_ATOMIC_AFF_MAPPING_FLAG_MASK);
-    }
-
-    /**
      * @return Errors.
      */
     @Nullable UpdateErrors errors() {
-        return errors;
+        return errs;
     }
 
     /**
      * @param errors Errors.
      */
     void errors(UpdateErrors errors) {
-        this.errors = errors;
-    }
-
-    /**
-     * @return Failed node ID.
-     */
-    UUID failedNodeId() {
-        return failedNodeId;
-    }
-
-    /**
-     * @param failedNodeId Failed node ID (used when primary notifies near node).
-     */
-    void failedNodeId(UUID failedNodeId) {
-        assert failedNodeId != null;
-
-        this.failedNodeId = failedNodeId;
-
-        setFlag(true, DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE);
-    }
-
-    /**
-     * @return {@code True} if message is sent from primary when DHT node fails.
-     */
-    boolean primaryDhtFailureResponse() {
-        return isFlag(DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE);
+        this.errs = errors;
     }
 
     /**
@@ -168,10 +119,10 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
      * @param e Error.
      */
     public void addFailedKey(KeyCacheObject key, Throwable e) {
-        if (errors == null)
-            errors = new UpdateErrors();
+        if (errs == null)
+            errs = new UpdateErrors();
 
-        errors.addFailedKey(key, e);
+        errs.addFailedKey(key, e);
     }
 
     /**
@@ -191,13 +142,6 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
     }
 
     /**
-     * @return Update mapping.
-     */
-    public List<UUID> mapping() {
-        return mapping;
-    }
-
-    /**
      * @param flag Set or clear.
      * @param mask Mask.
      */
@@ -246,16 +190,16 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (errors != null)
-            errors.prepareMarshal(this, ctx.cacheContext(cacheId));
+        if (errs != null)
+            errs.prepareMarshal(this, ctx.cacheContext(cacheId));
     }
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (errors != null)
-            errors.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr);
+        if (errs != null)
+            errs.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr);
     }
 
     /** {@inheritDoc} */
@@ -274,13 +218,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeMessage("errors", errors))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeUuid("failedNodeId", failedNodeId))
+                if (!writer.writeMessage("errs", errs))
                     return false;
 
                 writer.incrementState();
@@ -297,12 +235,6 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
                 writer.incrementState();
 
-            case 7:
-                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
-                    return false;
-
-                writer.incrementState();
-
             case 8:
                 if (!writer.writeInt("partId", partId))
                     return false;
@@ -332,15 +264,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
         switch (reader.state()) {
             case 3:
-                errors = reader.readMessage("errors");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                failedNodeId = reader.readUuid("failedNodeId");
+                errs = reader.readMessage("errs");
 
                 if (!reader.isLastRead())
                     return false;
@@ -363,14 +287,6 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 7:
-                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 8:
                 partId = reader.readInt("partId");
 
@@ -394,10 +310,9 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridDhtAtomicNearResponse.class, this, "flags",
+        return S.toString(GridDhtAtomicNearResponse.class, this,
+            "flags",
             "res=" + isFlag(DHT_ATOMIC_HAS_RESULT_MASK) +
-            "|resOk=" + isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK) +
-            "|affMap=" + isFlag(DHT_ATOMIC_AFF_MAPPING_FLAG_MASK) +
-            "|dhtFail=" + isFlag(DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE));
+            "|resOk=" + isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 2cc370f..e393322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -47,9 +47,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     GridDhtAtomicSingleUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
-        super(cctx, writeVer, updateReq);
+        super(cctx, writeVer, updateReq, updateRes, completionCb);
     }
 
     /** {@inheritDoc} */
@@ -67,7 +69,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     /** {@inheritDoc} */
     @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
         UUID nodeId,
-        UUID nearNodeId,
         long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
@@ -81,8 +82,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
                 cctx.cacheId(),
                 nodeId,
                 futId,
-                nearNodeId,
-                updateReq.futureId(),
                 writeVer,
                 syncMode,
                 topVer,
@@ -97,8 +96,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
                 cctx.cacheId(),
                 nodeId,
                 futId,
-                nearNodeId,
-                updateReq.futureId(),
                 writeVer,
                 syncMode,
                 topVer,
@@ -112,6 +109,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
+
+    }
+
     /**
      * @param ttl TTL.
      * @param conflictExpireTime Conflict expire time.

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index fa7c445..3d3ce04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -86,8 +86,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         int cacheId,
         UUID nodeId,
         long futId,
-        UUID nearNodeId,
-        long nearFutId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -100,8 +98,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         super(cacheId,
             nodeId,
             futId,
-            nearNodeId,
-            nearFutId,
             writeVer,
             syncMode,
             topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 1c12193..ed57cf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -46,9 +46,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
-        super(cctx, writeVer, updateReq);
+        super(cctx, writeVer, updateReq, updateRes, completionCb);
 
         mappings = U.newHashMap(updateReq.size());
     }
@@ -66,7 +68,6 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     /** {@inheritDoc} */
     @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
         UUID nodeId,
-        UUID nearNodeId,
         long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
@@ -79,8 +80,6 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
             cctx.cacheId(),
             nodeId,
             futId,
-            nearNodeId,
-            updateReq.futureId(),
             writeVer,
             syncMode,
             topVer,
@@ -94,6 +93,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     }
 
     /** {@inheritDoc} */
+    @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
+
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtAtomicUpdateFuture.class, this, "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index ef42af8..029ea42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -147,8 +147,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
         int cacheId,
         UUID nodeId,
         long futId,
-        UUID nearNodeId,
-        long nearFutId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -163,8 +161,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
         super(cacheId,
             nodeId,
             futId,
-            nearNodeId,
-            nearFutId,
             writeVer,
             syncMode,
             topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index b1a46d5..10806b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -53,7 +53,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
     private long futId;
 
     /** */
-    private UpdateErrors errors;
+    private UpdateErrors errs;
 
     /** Evicted readers. */
     @GridToStringInclude
@@ -84,10 +84,21 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
     }
 
     /**
+     * @param key Key.
+     * @param e Error.
+     */
+    public void addFailedKey(KeyCacheObject key, Throwable e) {
+        if (errs == null)
+            errs = new UpdateErrors();
+
+        errs.addFailedKey(key, e);
+    }
+
+    /**
      * @return Errors.
      */
     @Nullable UpdateErrors errors() {
-        return errors;
+        return errs;
     }
 
     /** {@inheritDoc} */
@@ -108,15 +119,15 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
      * @param err Error.
      */
     public void onError(IgniteCheckedException err){
-        if (errors == null)
-            errors = new UpdateErrors();
+        if (errs == null)
+            errs = new UpdateErrors();
 
-        errors.onError(err);
+        errs.onError(err);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteCheckedException error() {
-        return errors != null ? errors.error() : null;
+        return errs != null ? errs.error() : null;
     }
 
     /**
@@ -158,8 +169,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
         prepareMarshalCacheObjects(nearEvicted, cctx);
 
-        if (errors != null)
-            errors.prepareMarshal(this, cctx);
+        if (errs != null)
+            errs.prepareMarshal(this, cctx);
     }
 
     /** {@inheritDoc} */
@@ -170,8 +181,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
         finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
 
-        if (errors != null)
-            errors.finishUnmarshal(this, cctx, ldr);
+        if (errs != null)
+            errs.finishUnmarshal(this, cctx, ldr);
     }
 
     /** {@inheritDoc} */
@@ -200,7 +211,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeMessage("errors", errors))
+                if (!writer.writeMessage("errors", errs))
                     return false;
 
                 writer.incrementState();
@@ -240,7 +251,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
         switch (reader.state()) {
             case 3:
-                errors = reader.readMessage("errors");
+                errs = reader.readMessage("errors");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 2a17813..3a9055e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -18,19 +18,13 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.io.Externalizable;
-import java.nio.ByteBuffer;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -44,59 +38,6 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
     /** */
     private static final CacheEntryPredicate[] NO_FILTER = new CacheEntryPredicate[0];
 
-    /** Fast map flag mask. */
-    private static final int FAST_MAP_FLAG_MASK = 0x1;
-
-    /** Flag indicating whether request contains primary keys. */
-    private static final int HAS_PRIMARY_FLAG_MASK = 0x2;
-
-    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
-    private static final int TOP_LOCKED_FLAG_MASK = 0x4;
-
-    /** Skip write-through to a persistent storage. */
-    private static final int SKIP_STORE_FLAG_MASK = 0x8;
-
-    /** */
-    private static final int CLIENT_REQ_FLAG_MASK = 0x10;
-
-    /** Keep binary flag. */
-    private static final int KEEP_BINARY_FLAG_MASK = 0x20;
-
-    /** Return value flag. */
-    private static final int RET_VAL_FLAG_MASK = 0x40;
-
-    /** Target node ID. */
-    @GridDirectTransient
-    protected UUID nodeId;
-
-    /** Future version. */
-    protected long futId;
-
-    /** Update version. Set to non-null if fastMap is {@code true}. */
-    private GridCacheVersion updateVer;
-
-    /** Topology version. */
-    protected AffinityTopologyVersion topVer;
-
-    /** Write synchronization mode. */
-    protected CacheWriteSynchronizationMode syncMode;
-
-    /** Update operation. */
-    protected GridCacheOperation op;
-
-    /** Subject ID. */
-    protected UUID subjId;
-
-    /** Task name hash. */
-    protected int taskNameHash;
-
-    /** */
-    @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
-
-    /** Compressed boolean flags. */
-    protected byte flags;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -110,8 +51,6 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param futId Future ID.
-     * @param fastMap Fast map scheme flag.
-     * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
      * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
@@ -121,15 +60,12 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param keepBinary Keep binary flag.
-     * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      */
     protected GridNearAtomicAbstractSingleUpdateRequest(
         int cacheId,
         UUID nodeId,
         long futId,
-        boolean fastMap,
-        @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
         boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
@@ -137,89 +73,25 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean stableTop,
         boolean skipStore,
         boolean keepBinary,
-        boolean clientReq,
         boolean addDepInfo
     ) {
-        this.cacheId = cacheId;
-        this.nodeId = nodeId;
-        this.futId = futId;
-        this.updateVer = updateVer;
-        this.topVer = topVer;
-        this.syncMode = syncMode;
-        this.op = op;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-        this.addDepInfo = addDepInfo;
-
-        fastMap(fastMap);
-        topologyLocked(topLocked);
-        returnValue(retval);
-        skipStore(skipStore);
-        keepBinary(keepBinary);
-        clientRequest(clientReq);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /**
-     * @return Mapped node ID.
-     */
-    @Override public UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @param nodeId Node ID.
-     */
-    @Override public void nodeId(UUID nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    /**
-     * @return Subject ID.
-     */
-    @Override public UUID subjectId() {
-        return subjId;
-    }
-
-    /**
-     * @return Task name hash.
-     */
-    @Override public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /**
-     * @return Future version.
-     */
-    @Override public long futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Update version for fast-map request.
-     */
-    @Override public GridCacheVersion updateVersion() {
-        return updateVer;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * @return Cache write synchronization mode.
-     */
-    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
-        return syncMode;
+        super(cacheId,
+            nodeId,
+            futId,
+            topVer,
+            topLocked,
+            syncMode,
+            op,
+            retval,
+            subjId,
+            taskNameHash,
+            stableTop,
+            skipStore,
+            keepBinary,
+            addDepInfo);
     }
 
     /**
@@ -230,331 +102,14 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
     }
 
     /**
-     * @return Update operation.
-     */
-    @Override public GridCacheOperation operation() {
-        return op;
-    }
-
-    /**
      * @return Optional arguments for entry processor.
      */
     @Override @Nullable public Object[] invokeArguments() {
         return null;
     }
 
-    /**
-     * @param res Response.
-     * @return {@code True} if current response was {@code null}.
-     */
-    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
-        if (this.res == null) {
-            this.res = res;
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @return Response.
-     */
-    @Override @Nullable public GridNearAtomicUpdateResponse response() {
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
-        return ctx.atomicMessageLogger();
-    }
-
-    /**
-     * @return Flag indicating whether this is fast-map udpate.
-     */
-    @Override public boolean fastMap() {
-        return isFlag(FAST_MAP_FLAG_MASK);
-    }
-
-    /**
-     * Sets fastMap flag value.
-     */
-    public void fastMap(boolean val) {
-        setFlag(val, FAST_MAP_FLAG_MASK);
-    }
-
-    /**
-     * @return Topology locked flag.
-     */
-    @Override public boolean topologyLocked() {
-        return isFlag(TOP_LOCKED_FLAG_MASK);
-    }
-
-    /**
-     * Sets topologyLocked flag value.
-     */
-    public void topologyLocked(boolean val) {
-        setFlag(val, TOP_LOCKED_FLAG_MASK);
-    }
-
-    /**
-     * @return {@code True} if request sent from client node.
-     */
-    @Override public boolean clientRequest() {
-        return isFlag(CLIENT_REQ_FLAG_MASK);
-    }
-
-    /**
-     * Sets clientRequest flag value.
-     */
-    public void clientRequest(boolean val) {
-        setFlag(val, CLIENT_REQ_FLAG_MASK);
-    }
-
-    /**
-     * @return Return value flag.
-     */
-    @Override public boolean returnValue() {
-        return isFlag(RET_VAL_FLAG_MASK);
-    }
-
-    /**
-     * Sets returnValue flag value.
-     */
-    public void returnValue(boolean val) {
-        setFlag(val, RET_VAL_FLAG_MASK);
-    }
-
-    /**
-     * @return Skip write-through to a persistent storage.
-     */
-    @Override public boolean skipStore() {
-        return isFlag(SKIP_STORE_FLAG_MASK);
-    }
-
-    /**
-     * Sets skipStore flag value.
-     */
-    public void skipStore(boolean val) {
-        setFlag(val, SKIP_STORE_FLAG_MASK);
-    }
-
-    /**
-     * @return Keep binary flag.
-     */
-    @Override public boolean keepBinary() {
-        return isFlag(KEEP_BINARY_FLAG_MASK);
-    }
-
-    /**
-     * Sets keepBinary flag value.
-     */
-    public void keepBinary(boolean val) {
-        setFlag(val, KEEP_BINARY_FLAG_MASK);
-    }
-
-    /**
-     * @return Flag indicating whether this request contains primary keys.
-     */
-    @Override public boolean hasPrimary() {
-        return isFlag(HAS_PRIMARY_FLAG_MASK);
-    }
-
-    /**
-     * Sets hasPrimary flag value.
-     */
-    public void hasPrimary(boolean val) {
-        setFlag(val, HAS_PRIMARY_FLAG_MASK);
-    }
-
     /** {@inheritDoc} */
     @Nullable @Override public CacheEntryPredicate[] filter() {
         return NO_FILTER;
     }
-
-    /**
-     * Sets flag mask.
-     *
-     * @param flag Set or clear.
-     * @param mask Mask.
-     */
-    private void setFlag(boolean flag, int mask) {
-        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
-    }
-
-    /**
-     * Reads flag mask.
-     *
-     * @param mask Mask to read.
-     * @return Flag value.
-     */
-    private boolean isFlag(int mask) {
-        return (flags & mask) != 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 3:
-                if (!writer.writeByte("flags", flags))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeUuid("subjId", subjId))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeMessage("topVer", topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeMessage("updateVer", updateVer))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 3:
-                flags = reader.readByte("flags");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                byte opOrd;
-
-                opOrd = reader.readByte("op");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                op = GridCacheOperation.fromOrdinal(opOrd);
-
-                reader.incrementState();
-
-            case 6:
-                subjId = reader.readUuid("subjId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                byte syncModeOrd;
-
-                syncModeOrd = reader.readByte("syncMode");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
-                reader.incrementState();
-
-            case 8:
-                taskNameHash = reader.readInt("taskNameHash");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                topVer = reader.readMessage("topVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                updateVer = reader.readMessage("updateVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(GridNearAtomicAbstractSingleUpdateRequest.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 11;
-    }
 }