You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/15 15:18:47 UTC

[1/2] ignite git commit: ignite-4705

Repository: ignite
Updated Branches:
  refs/heads/ignite-4705 [created] 7287a9368


http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 891a20c..8c3a364 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
@@ -41,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedExceptio
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -49,19 +50,14 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
  * DHT atomic cache near update future.
  */
 public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
-    /** */
-    private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
-
     /** Keys */
     private Object key;
 
@@ -69,9 +65,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Object val;
 
-    /** Not null is operation is mapped to single node. */
+    /** Current request. */
     private GridNearAtomicAbstractUpdateRequest req;
 
+    /** */
+    private Set<UUID> mapping;
+
     /**
      * @param cctx Cache context.
      * @param cache Cache instance.
@@ -120,9 +119,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
+    @Override public Long id() {
         synchronized (mux) {
-            return futVer;
+            return futId;
         }
     }
 
@@ -138,7 +137,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (req != null && req.response() == null) {
                 res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
                     cctx.deploymentEnabled());
 
                 ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
@@ -152,7 +151,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
         if (res != null) {
             if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Near update single fut, node left [futId=" + req.futureVersion() +
+                msgLog.debug("Near update single fut, node left [futId=" + req.futureId() +
                     ", writeVer=" + req.updateVersion() +
                     ", node=" + nodeId + ']');
             }
@@ -183,7 +182,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             retval = Collections.emptyMap();
 
         if (super.onDone(retval, err)) {
-            GridCacheVersion futVer = onFutureDone();
+            Long futVer = onFutureDone();
 
             if (futVer != null)
                 cctx.mvcc().removeAtomicFuture(futVer);
@@ -195,6 +194,33 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
+    @Override public void onResult(UUID nodeId, GridNearAtomicDhtResponse res) {
+        GridCacheReturn opRes0 = null;
+
+        synchronized (mux) {
+            if (futId == null || futId != res.futureId())
+                return;
+
+            if (mapping == null)
+                mapping = new HashSet<>(res.mapping());
+
+            mapping.remove(nodeId);
+
+            if (opRes == null && res.hasResult())
+                opRes = res.result();
+
+            if (mapping.isEmpty() && opRes != null) {
+                opRes0 = opRes;
+
+                futId = null;
+            }
+        }
+
+        if (opRes0 != null)
+            onDone(opRes0);
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
     @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
         GridNearAtomicAbstractUpdateRequest req;
@@ -207,7 +233,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridFutureAdapter<?> fut0 = null;
 
         synchronized (mux) {
-            if (!res.futureVersion().equals(futVer))
+            if (futId == null || futId != res.futureId())
                 return;
 
             if (!this.req.nodeId().equals(nodeId))
@@ -238,24 +264,22 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 }
             }
             else {
-                if (!req.fastMap() || req.hasPrimary()) {
-                    GridCacheReturn ret = res.returnValue();
-
-                    if (op == TRANSFORM) {
-                        if (ret != null) {
-                            assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
-                            if (ret.value() != null) {
-                                if (opRes != null)
-                                    opRes.mergeEntryProcessResults(ret);
-                                else
-                                    opRes = ret;
-                            }
+                GridCacheReturn ret = res.returnValue();
+
+                if (op == TRANSFORM) {
+                    if (ret != null) {
+                        assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+                        if (ret.value() != null) {
+                            if (opRes != null)
+                                opRes.mergeEntryProcessResults(ret);
+                            else
+                                opRes = ret;
                         }
                     }
-                    else
-                        opRes = ret;
                 }
+                else
+                    opRes = ret;
             }
 
             if (remapKey) {
@@ -282,7 +306,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                             new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
 
                         err = null;
-                        updVer = null;
                     }
                 }
             }
@@ -296,9 +319,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 topCompleteFut = null;
 
-                cctx.mvcc().removeAtomicFuture(futVer);
+                cctx.mvcc().removeAtomicFuture(futId);
 
-                futVer = null;
+                futId = null;
                 topVer = AffinityTopologyVersion.ZERO;
             }
         }
@@ -379,11 +402,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @Override protected void mapOnTopology() {
+        // TODO IGNITE-4705: primary should block topology change, so it seem read lock is not needed.
         cache.topology().readLock();
 
         AffinityTopologyVersion topVer;
 
-        GridCacheVersion futVer;
+        Long futId;
 
         try {
             if (cache.topology().stopping()) {
@@ -406,7 +430,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 topVer = fut.topologyVersion();
 
-                futVer = addAtomicFuture(topVer);
+                futId = addAtomicFuture(topVer);
             }
             else {
                 if (waitTopFut) {
@@ -432,49 +456,22 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             cache.topology().readUnlock();
         }
 
-        if (futVer != null)
-            map(topVer, futVer);
+        if (futId != null)
+            map(topVer, futId);
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
-        Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
-
-        if (F.isEmpty(topNodes)) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
-                "left the grid)."));
-
-            return;
-        }
-
-        GridCacheVersion updVer;
-
-        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-        if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
-            updVer = this.updVer;
-
-            if (updVer == null) {
-                updVer = cctx.versions().next(topVer);
-
-                if (log.isDebugEnabled())
-                    log.debug("Assigned fast-map version for update on near node: " + updVer);
-            }
-        }
-        else
-            updVer = null;
-
+    @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
         Exception err = null;
         GridNearAtomicAbstractUpdateRequest singleReq0 = null;
 
         try {
-            singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+            singleReq0 = mapSingleUpdate(topVer, futId);
 
             synchronized (mux) {
-                assert this.futVer == futVer || (this.isDone() && this.error() != null);
+                assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
                 assert this.topVer == topVer;
 
-                this.updVer = updVer;
-
                 resCnt = 0;
 
                 req = singleReq0;
@@ -495,10 +492,10 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /**
-     * @return Future version.
+     * @return Future ID.
      */
-    private GridCacheVersion onFutureDone() {
-        GridCacheVersion ver0;
+    private Long onFutureDone() {
+        Long id0;
 
         GridFutureAdapter<Void> fut0;
 
@@ -507,27 +504,25 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
             topCompleteFut = null;
 
-            ver0 = futVer;
+            id0 = futId;
 
-            futVer = null;
+            futId = null;
         }
 
         if (fut0 != null)
             fut0.onDone();
 
-        return ver0;
+        return id0;
     }
 
     /**
      * @param topVer Topology version.
-     * @param futVer Future version.
-     * @param updVer Update version.
+     * @param futId Future ID.
      * @return Request.
      * @throws Exception If failed.
      */
-    private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
-        @Nullable GridCacheVersion updVer) throws Exception {
+    private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, long futId)
+        throws Exception {
         if (key == null)
             throw new NullPointerException("Null key.");
 
@@ -551,14 +546,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
         GridNearAtomicAbstractUpdateRequest req;
 
-        if (canUseSingleRequest(primary)) {
+        if (canUseSingleRequest()) {
             if (op == TRANSFORM) {
                 req = new GridNearAtomicSingleUpdateInvokeRequest(
                     cctx.cacheId(),
                     primary.id(),
-                    futVer,
+                    futId,
                     false,
-                    updVer,
+                    null,
                     topVer,
                     topLocked,
                     syncMode,
@@ -577,9 +572,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     req = new GridNearAtomicSingleUpdateRequest(
                         cctx.cacheId(),
                         primary.id(),
-                        futVer,
+                        futId,
                         false,
-                        updVer,
+                        null,
                         topVer,
                         topLocked,
                         syncMode,
@@ -596,9 +591,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     req = new GridNearAtomicSingleUpdateFilterRequest(
                         cctx.cacheId(),
                         primary.id(),
-                        futVer,
+                        futId,
                         false,
-                        updVer,
+                        null,
                         topVer,
                         topLocked,
                         syncMode,
@@ -618,9 +613,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             req = new GridNearAtomicFullUpdateRequest(
                 cctx.cacheId(),
                 primary.id(),
-                futVer,
+                futId,
                 false,
-                updVer,
+                null,
                 topVer,
                 topLocked,
                 syncMode,
@@ -649,11 +644,10 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /**
-     * @param node Target node
      * @return {@code True} can use 'single' update requests.
      */
-    private boolean canUseSingleRequest(ClusterNode node) {
-        return expiryPlc == null && node != null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0;
+    private boolean canUseSingleRequest() {
+        return expiryPlc == null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 6582063..269443f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -76,7 +76,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param fastMap Fast map scheme flag.
      * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
@@ -95,7 +95,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
     GridNearAtomicSingleUpdateInvokeRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
+        long futId,
         boolean fastMap,
         @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
@@ -114,7 +114,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         super(
             cacheId,
             nodeId,
-            futVer,
+            futId,
             fastMap,
             updateVer,
             topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index c3e9fbe..0045228 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -72,7 +72,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param fastMap Fast map scheme flag.
      * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
@@ -90,7 +90,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
     GridNearAtomicSingleUpdateRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
+        long futId,
         boolean fastMap,
         @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
@@ -108,7 +108,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         super(
             cacheId,
             nodeId,
-            futVer,
+            futId,
             fastMap,
             updateVer,
             topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 2315a18..c075f09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -154,9 +154,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
+    @Override public Long id() {
         synchronized (mux) {
-            return futVer;
+            return futId;
         }
     }
 
@@ -175,7 +175,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             if (req != null && req.response() == null) {
                 res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
                     cctx.deploymentEnabled());
 
                 ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
@@ -189,7 +189,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (res != null) {
             if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Near update fut, node left [futId=" + req.futureVersion() +
+                msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
                     ", writeVer=" + req.updateVersion() +
                     ", node=" + nodeId + ']');
             }
@@ -244,10 +244,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             retval = Collections.emptyMap();
 
         if (super.onDone(retval, err)) {
-            GridCacheVersion futVer = onFutureDone();
+            Long futId = onFutureDone();
 
-            if (futVer != null)
-                cctx.mvcc().removeAtomicFuture(futVer);
+            if (futId != null)
+                cctx.mvcc().removeAtomicFuture(futId);
 
             return true;
         }
@@ -256,6 +256,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
+    @Override public void onResult(UUID nodeId, GridNearAtomicDhtResponse res) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
     @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
         GridNearAtomicFullUpdateRequest req;
@@ -270,7 +275,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         GridFutureAdapter<?> fut0 = null;
 
         synchronized (mux) {
-            if (!res.futureVersion().equals(futVer))
+            if (futId == null || futId != res.futureId())
                 return;
 
             if (singleReq != null) {
@@ -375,8 +380,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                             for (Object key : failedKeys)
                                 remapKeys.add(cctx.toCacheKeyObject(key));
-
-                            updVer = null;
                         }
                     }
                 }
@@ -390,9 +393,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                     topCompleteFut = null;
 
-                    cctx.mvcc().removeAtomicFuture(futVer);
+                    cctx.mvcc().removeAtomicFuture(futId);
 
-                    futVer = null;
+                    futId = null;
                     topVer = AffinityTopologyVersion.ZERO;
                 }
             }
@@ -492,7 +495,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         AffinityTopologyVersion topVer;
 
-        GridCacheVersion futVer;
+        Long futId;
 
         try {
             if (cache.topology().stopping()) {
@@ -515,7 +518,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                 topVer = fut.topologyVersion();
 
-                futVer = addAtomicFuture(topVer);
+                futId = addAtomicFuture(topVer);
             }
             else {
                 if (waitTopFut) {
@@ -541,8 +544,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             cache.topology().readUnlock();
         }
 
-        if (futVer != null)
-            map(topVer, futVer, remapKeys);
+        if (futId != null)
+            map(topVer, futId, remapKeys);
     }
 
     /**
@@ -568,14 +571,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                     if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() +
+                        msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
                             ", writeVer=" + req.updateVersion() +
                             ", node=" + req.nodeId() + ']');
                     }
                 }
                 catch (IgniteCheckedException e) {
                     if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() +
+                        msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
                             ", writeVer=" + req.updateVersion() +
                             ", node=" + req.nodeId() +
                             ", err=" + e + ']');
@@ -600,17 +603,17 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
-        map(topVer, futVer, null);
+    @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
+        map(topVer, futId, null);
     }
 
     /**
      * @param topVer Topology version.
-     * @param futVer Future ID.
+     * @param futId Future ID.
      * @param remapKeys Keys to remap.
      */
     void map(AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
+        Long futId,
         @Nullable Collection<KeyCacheObject> remapKeys) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
@@ -625,14 +628,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         // Assign version on near node in CLOCK ordering mode even if fastMap is false.
         if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
-            updVer = this.updVer;
-
-            if (updVer == null) {
-                updVer = cctx.versions().next(topVer);
-
-                if (log.isDebugEnabled())
-                    log.debug("Assigned fast-map version for update on near node: " + updVer);
-            }
+// TODO IGNITE-4705 (get rid of updVer everywhere).
+//            updVer = this.updVer;
+//
+//            if (updVer == null) {
+//                updVer = cctx.versions().next(topVer);
+//
+//                if (log.isDebugEnabled())
+//                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+//            }
         }
         else
             updVer = null;
@@ -647,13 +651,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             if (size == 1 && !fastMap) {
                 assert remapKeys == null || remapKeys.size() == 1;
 
-                singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+                singleReq0 = mapSingleUpdate(topVer, futId, null);
             }
             else {
                 Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = mapUpdate(topNodes,
                     topVer,
-                    futVer,
-                    updVer,
+                    futId,
+                    null,
                     remapKeys);
 
                 if (pendingMappings.size() == 1)
@@ -675,10 +679,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
 
             synchronized (mux) {
-                assert this.futVer == futVer || (this.isDone() && this.error() != null);
+                assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
                 assert this.topVer == topVer;
 
-                this.updVer = updVer;
+//                this.updVer = updVer;
 
                 resCnt = 0;
 
@@ -714,8 +718,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     /**
      * @return Future version.
      */
-    private GridCacheVersion onFutureDone() {
-        GridCacheVersion ver0;
+    private Long onFutureDone() {
+        Long id0;
 
         GridFutureAdapter<Void> fut0;
 
@@ -724,21 +728,21 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
             topCompleteFut = null;
 
-            ver0 = futVer;
+            id0 = futId;
 
-            futVer = null;
+            futId = null;
         }
 
         if (fut0 != null)
             fut0.onDone();
 
-        return ver0;
+        return id0;
     }
 
     /**
      * @param topNodes Cache nodes.
      * @param topVer Topology version.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param updVer Update version.
      * @param remapKeys Keys to remap.
      * @return Mapping.
@@ -747,7 +751,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private Map<UUID, GridNearAtomicFullUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
+        Long futId,
         @Nullable GridCacheVersion updVer,
         @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
         Iterator<?> it = null;
@@ -843,7 +847,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     mapped = new GridNearAtomicFullUpdateRequest(
                         cctx.cacheId(),
                         nodeId,
-                        futVer,
+                        futId,
                         fastMap,
                         updVer,
                         topVer,
@@ -876,13 +880,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /**
      * @param topVer Topology version.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param updVer Update version.
      * @return Request.
      * @throws Exception If failed.
      */
     private GridNearAtomicFullUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
+        Long futId,
         @Nullable GridCacheVersion updVer) throws Exception {
         Object key = F.first(keys);
 
@@ -945,7 +949,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
             cctx.cacheId(),
             primary.id(),
-            futVer,
+            futId,
             fastMap,
             updVer,
             topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 2e38733..b089193 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -59,8 +59,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @GridDirectTransient
     private UUID nodeId;
 
-    /** Future version. */
-    private GridCacheVersion futVer;
+    /** Future ID. */
+    private long futId;
 
     /** Update error. */
     @GridDirectTransient
@@ -115,15 +115,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /**
      * @param cacheId Cache ID.
      * @param nodeId Node ID this reply should be sent to.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param addDepInfo Deployment info flag.
      */
-    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
-        assert futVer != null;
-
+    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, long futId, boolean addDepInfo) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
-        this.futVer = futVer;
+        this.futId = futId;
         this.addDepInfo = addDepInfo;
     }
 
@@ -147,10 +145,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
-     * @return Future version.
+     * @return Future ID.
      */
-    public GridCacheVersion futureVersion() {
-        return futVer;
+    public long futureId() {
+        return futId;
     }
 
     /**
@@ -468,7 +466,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
@@ -554,7 +552,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 5:
-                futVer = reader.readMessage("futVer");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a1a18fe..5706536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -264,7 +264,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         txHnd = new IgniteTxHandler(cctx);
 
-        deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+        deferredAckMessageSender = new GridDeferredAckMessageSender<GridCacheVersion>(cctx.time(), cctx.kernalContext().closure()) {
             @Override public int getTimeout() {
                 return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
             }


[2/2] ignite git commit: ignite-4705

Posted by sb...@apache.org.
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7287a936
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7287a936
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7287a936

Branch: refs/heads/ignite-4705
Commit: 7287a93682d203f6d5043b681ef40968662d3f6c
Parents: 85b08c5
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 15 16:47:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 15 18:18:37 2017 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |  27 ++-
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/cache/GridCacheAtomicFuture.java |   4 +-
 .../processors/cache/GridCacheIoManager.java    |  22 +-
 .../processors/cache/GridCacheMvccManager.java  |  40 ++--
 .../cache/GridDeferredAckMessageSender.java     |  10 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  28 +--
 .../GridDhtAtomicAbstractUpdateRequest.java     |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  94 ++++----
 .../GridDhtAtomicDeferredUpdateResponse.java    |  20 +-
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |   6 +-
 .../GridDhtAtomicSingleUpdateRequest.java       |  16 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   4 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  18 +-
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |  17 +-
 ...idNearAtomicAbstractSingleUpdateRequest.java |  18 +-
 .../GridNearAtomicAbstractUpdateFuture.java     |  37 ++--
 .../GridNearAtomicAbstractUpdateRequest.java    |   2 +-
 .../dht/atomic/GridNearAtomicDhtResponse.java   | 222 +++++++++++++++++++
 .../atomic/GridNearAtomicFullUpdateRequest.java |  18 +-
 ...GridNearAtomicSingleUpdateFilterRequest.java |   6 +-
 .../GridNearAtomicSingleUpdateFuture.java       | 170 +++++++-------
 ...GridNearAtomicSingleUpdateInvokeRequest.java |   6 +-
 .../GridNearAtomicSingleUpdateRequest.java      |   6 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  96 ++++----
 .../atomic/GridNearAtomicUpdateResponse.java    |  22 +-
 .../cache/transactions/IgniteTxManager.java     |   2 +-
 27 files changed, 596 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 5a26187..16b1e01 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -44,6 +44,19 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -168,7 +181,19 @@ public class MessageCodeGenerator {
 
 //        gen.generateAll(true);
 
-//        gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
+//        gen.generateAndWrite(GridNearAtomicDhtResponse.class);
+//        gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
+//        gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
+//        gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
+//        gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class);
+//        gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
+//        gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
+//        gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
+//        gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
+//        gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
+//        gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
+//        gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
+//        gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
 
 //        gen.generateAndWrite(GridMessageCollection.class);
 //        gen.generateAndWrite(DataStreamerEntry.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f95400..769a615 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
@@ -173,6 +174,11 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -45:
+                msg = new GridNearAtomicDhtResponse();
+
+                break;
+
             case -44:
                 msg = new TcpCommunicationSpi.HandshakeMessage2();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 3e11d50..565f11e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -26,9 +26,9 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
  */
 public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
-     * @return Future version.
+     * @return Future ID.
      */
-    public GridCacheVersion version();
+    public Long id();
 
     /**
      * Gets future that will be completed when it is safe when update is finished on the given version of topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 924ce79..f9952b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -467,15 +467,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param cacheMsg Cache message.
      * @return Atomic future ID if applicable for message.
      */
-    @Nullable private GridCacheVersion atomicFututeId(GridCacheMessage cacheMsg) {
+    @Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) {
         if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
-            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
+            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
         else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
-            return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion();
+            return ((GridNearAtomicUpdateResponse) cacheMsg).futureId();
         else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
-            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
+            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureId();
         else if (cacheMsg instanceof GridDhtAtomicUpdateResponse)
-            return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion();
+            return ((GridDhtAtomicUpdateResponse) cacheMsg).futureId();
 
         return null;
     }
@@ -557,7 +557,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
                     ctx.cacheId(),
-                    req.futureVersion(),
+                    req.futureId(),
                     ctx.deploymentEnabled());
 
                 res.onError(req.classError());
@@ -573,7 +573,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -751,7 +751,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -767,7 +767,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -783,7 +783,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -798,7 +798,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
                     ctx.cacheId(),
-                    req.futureVersion(),
+                    req.futureId(),
                     ctx.deploymentEnabled());
 
                 res.onError(req.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 0d0e9ee..e8a5c8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -106,8 +107,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
 
     /** Pending atomic futures. */
-    private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts =
-        new ConcurrentHashMap8<>();
+    private final ConcurrentMap<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
 
     /** Pending data streamer futures. */
     private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>();
@@ -135,6 +135,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /** */
     private volatile boolean stopping;
 
+    /** */
+    private final AtomicLong atomicFutId = new AtomicLong();
+
     /** Lock callback. */
     @GridToStringExclude
     private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
@@ -253,10 +256,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 cacheFut.onNodeLeft(discoEvt.eventNode().id());
 
                 if (cacheFut.isCancelled() || cacheFut.isDone()) {
-                    GridCacheVersion futVer = cacheFut.version();
+                    Long futId = cacheFut.id();
 
-                    if (futVer != null)
-                        atomicFuts.remove(futVer, cacheFut);
+                    if (futId != null)
+                        atomicFuts.remove(futId, cacheFut);
                 }
             }
         }
@@ -423,14 +426,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param futVer Future ID.
+     * @return ID for atomic cache update future.
+     */
+    public long atomicFutureId() {
+        return atomicFutId.incrementAndGet();
+    }
+
+    /**
+     * @param futId Future ID.
      * @param fut Future.
      * @return {@code False} if future was forcibly completed with error.
      */
-    public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) {
-        IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
+    public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+        IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
 
-        assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
+        assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
 
         return onFutureAdded(fut);
     }
@@ -452,19 +462,19 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /**
      * Gets future by given future ID.
      *
-     * @param futVer Future ID.
+     * @param futId Future ID.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer) {
-        return atomicFuts.get(futVer);
+    @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+        return atomicFuts.get(futId);
     }
 
     /**
-     * @param futVer Future ID.
+     * @param futId Future ID.
      * @return Removed future.
      */
-    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer) {
-        return atomicFuts.remove(futVer);
+    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+        return atomicFuts.remove(futId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 7145dc2..8df883a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -33,7 +33,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
 /**
  *
  */
-public abstract class GridDeferredAckMessageSender {
+public abstract class GridDeferredAckMessageSender<T> {
     /** Deferred message buffers. */
     private ConcurrentMap<UUID, DeferredAckMessageBuffer> deferredAckMsgBuffers = new ConcurrentHashMap8<>();
 
@@ -66,7 +66,7 @@ public abstract class GridDeferredAckMessageSender {
     /**
      *
      */
-    public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
+    public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<T> vers);
 
     /**
      *
@@ -80,7 +80,7 @@ public abstract class GridDeferredAckMessageSender {
      * @param nodeId Node ID to send message to.
      * @param ver Version to ack.
      */
-    public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) {
+    public void sendDeferredAckMessage(UUID nodeId, T ver) {
         while (true) {
             DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId);
 
@@ -116,7 +116,7 @@ public abstract class GridDeferredAckMessageSender {
         private AtomicBoolean guard = new AtomicBoolean(false);
 
         /** Versions. */
-        private ConcurrentLinkedDeque8<GridCacheVersion> vers = new ConcurrentLinkedDeque8<>();
+        private ConcurrentLinkedDeque8<T> vers = new ConcurrentLinkedDeque8<>();
 
         /** Node ID. */
         private final UUID nodeId;
@@ -172,7 +172,7 @@ public abstract class GridDeferredAckMessageSender {
          * @param ver Version to send.
          * @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used.
          */
-        public boolean add(GridCacheVersion ver) {
+        public boolean add(T ver) {
             readLock().lock();
 
             boolean snd = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 1b175d0..10d1c4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -74,7 +74,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     protected final GridCacheContext cctx;
 
     /** Future version. */
-    protected final GridCacheVersion futVer;
+    protected final long futId;
 
     /** Completion callback. */
     @GridToStringExclude
@@ -114,7 +114,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         GridNearAtomicUpdateResponse updateRes) {
         this.cctx = cctx;
 
-        futVer = cctx.versions().next(updateReq.topologyVersion());
+        futId = cctx.mvcc().atomicFutureId();
         this.updateReq = updateReq;
         this.completionCb = completionCb;
         this.updateRes = updateRes;
@@ -191,7 +191,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                 if (updateReq == null) {
                     updateReq = createRequest(
                         node,
-                        futVer,
+                        futId,
                         writeVer,
                         syncMode,
                         topVer,
@@ -260,7 +260,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
                 updateReq = createRequest(
                     node,
-                    futVer,
+                    futId,
                     writeVer,
                     syncMode,
                     topVer,
@@ -297,12 +297,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /** {@inheritDoc} */
     @Override public final IgniteUuid futureId() {
-        return futVer.asGridUuid();
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public final GridCacheVersion version() {
-        return futVer;
+    @Override public final Long id() {
+        return futId;
     }
 
     /** {@inheritDoc} */
@@ -310,7 +310,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         boolean res = registerResponse(nodeId);
 
         if (res && msgLog.isDebugEnabled()) {
-            msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer +
+            msgLog.debug("DTH update fut, node left [futId=" + futId + ", writeVer=" + writeVer +
                 ", node=" + nodeId + ']');
         }
 
@@ -358,20 +358,20 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                     if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, sent request [futId=" + futVer +
+                        msgLog.debug("DTH update fut, sent request [futId=" + futId +
                             ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
                     }
                 }
                 catch (ClusterTopologyCheckedException ignored) {
                     if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer +
+                        msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futId +
                             ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
                     }
 
                     registerResponse(req.nodeId());
                 }
                 catch (IgniteCheckedException ignored) {
-                    U.error(msgLog, "Failed to send request [futId=" + futVer +
+                    U.error(msgLog, "Failed to send request [futId=" + futId +
                         ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
 
                     registerResponse(req.nodeId());
@@ -401,7 +401,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /**
      * @param node Node.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param writeVer Update version.
      * @param syncMode Write synchronization mode.
      * @param topVer Topology version.
@@ -412,7 +412,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      */
     protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
         ClusterNode node,
-        GridCacheVersion futVer,
+        long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -438,7 +438,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /** {@inheritDoc} */
     @Override public final boolean onDone(@Nullable Void res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
-            cctx.mvcc().removeAtomicFuture(version());
+            cctx.mvcc().removeAtomicFuture(futId);
 
             boolean suc = err == null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index f0bea07..7aa440d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -168,7 +168,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /**
      * @return Version assigned on primary node.
      */
-    public abstract GridCacheVersion futureVersion();
+    public abstract long futureId();
 
     /**
      * @return Write version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1b6179e..87ac54b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
@@ -97,6 +98,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
@@ -241,7 +243,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        deferredUpdateMsgSnd = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
+        deferredUpdateMsgSnd = new GridDeferredAckMessageSender<Long>(ctx.time(), ctx.closures()) {
             @Override public int getTimeout() {
                 return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
             }
@@ -250,7 +252,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE;
             }
 
-            @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+            @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<Long> vers) {
                 GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
                     vers, ctx.deploymentEnabled());
 
@@ -261,7 +263,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         ctx.io().send(nodeId, msg, ctx.ioPolicy());
 
                         if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
+                            msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() +
                                 ", node=" + nodeId + ']');
                         }
                     }
@@ -272,18 +274,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 catch (IllegalStateException ignored) {
                     if (msgLog.isDebugEnabled()) {
                         msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
-                            "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+                            "futIds=" + msg.futureIds() + ", node=" + nodeId + ']');
                     }
                 }
                 catch (ClusterTopologyCheckedException ignored) {
                     if (msgLog.isDebugEnabled()) {
                         msgLog.debug("Failed to send deferred DHT update response, node left [" +
-                            "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+                            "futIds=" + msg.futureIds() + ", node=" + nodeId + ']');
                     }
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send deferred DHT update response to remote node [" +
-                        "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
+                        "futIds=" + msg.futureIds() + ", node=" + nodeId + ']', e);
                 }
             }
         };
@@ -420,6 +422,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicDhtResponse.class, new CI2<UUID, GridNearAtomicDhtResponse>() {
+            @Override public void apply(UUID uuid, GridNearAtomicDhtResponse msg) {
+
+            }
+
+            @Override public String toString() {
+                return "GridDhtAtomicDeferredUpdateResponse handler " +
+                    "[msgIdx=" + GridNearAtomicDhtResponse.CACHE_MSG_IDX + ']';
+            }
+        });
+
         if (near == null) {
             ctx.io().addHandler(
                 ctx.cacheId(),
@@ -1343,8 +1356,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         CacheEntryPredicate[] filters = CU.filterArray(filter);
 
         if (conflictPutVal == null &&
-            conflictRmvVer == null &&
-            !isFastMap(filters, op)) {
+            conflictRmvVer == null) {
             return new GridNearAtomicSingleUpdateFuture(
                 ctx,
                 this,
@@ -1752,7 +1764,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             nodeId,
-            req.futureVersion(),
+            req.futureId(),
             ctx.deploymentEnabled());
 
         res.addFailedKeys(req.keys(), e);
@@ -1772,7 +1784,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridNearAtomicAbstractUpdateRequest req,
         CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureId(),
             ctx.deploymentEnabled());
 
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
@@ -1817,7 +1829,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         if (node == null) {
                             U.warn(msgLog, "Skip near update request, node originated update request left [" +
-                                "futId=" + req.futureVersion() + ", node=" + nodeId + ']');
+                                "futId=" + req.futureId() + ", node=" + nodeId + ']');
 
                             return;
                         }
@@ -1834,7 +1846,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 res.nearVersion(ver);
 
                             if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("Assigned update version [futId=" + req.futureVersion() +
+                                msgLog.debug("Assigned update version [futId=" + req.futureId() +
                                     ", writeVer=" + ver + ']');
                             }
                         }
@@ -1903,7 +1915,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             req.cleanup(!node.isLocal());
 
                         if (dhtFut != null)
-                            ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
+                            ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
                     }
                     else
                         // Should remap all keys.
@@ -3169,7 +3181,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (msgLog.isDebugEnabled()) {
-            msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() +
+            msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
                 ", writeVer=" + req.updateVersion() +
                 ", node=" + nodeId + ']');
         }
@@ -3186,19 +3198,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @SuppressWarnings("unchecked")
     private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
         if (msgLog.isDebugEnabled())
-            msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + ", node=" + nodeId + ']');
+            msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
 
         res.nodeId(ctx.localNodeId());
 
         GridNearAtomicAbstractUpdateFuture fut =
-            (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+            (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
 
         if (fut != null)
             fut.onResult(nodeId, res, false);
 
         else
             U.warn(msgLog, "Failed to find near update future for update response (will ignore) " +
-                "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']');
+                "[futId=" + res.futureId() + ", node=" + nodeId + ", res=" + res + ']');
     }
 
     /**
@@ -3207,14 +3219,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) {
         if (msgLog.isDebugEnabled()) {
-            msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() +
+            msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() +
                 ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
         }
 
         GridCacheVersion ver = req.writeVersion();
 
         // Always send update reply.
-        GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(),
+        GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(),
             ctx.deploymentEnabled());
 
         Boolean replicate = ctx.isDrEnabled();
@@ -3311,36 +3323,44 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 ctx.io().send(nodeId, res, ctx.ioPolicy());
 
                 if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() +
+                    msgLog.debug("Sent DHT atomic update response [futId=" + req.futureId() +
                         ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
                 }
             }
             else {
                 if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() +
+                    msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureId() +
                         ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
                 }
 
                 // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
-                sendDeferredUpdateResponse(nodeId, req.futureVersion());
+                sendDeferredUpdateResponse(nodeId, req.futureId());
             }
         }
         catch (ClusterTopologyCheckedException ignored) {
-            U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureVersion() +
+            U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureId() +
                 ", node=" + req.nodeId() + ']');
         }
         catch (IgniteCheckedException e) {
-            U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureVersion() +
+            U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureId() +
                 ", node=" + nodeId +  ", res=" + res + ']', e);
         }
     }
 
     /**
      * @param nodeId Node ID to send message to.
-     * @param ver Version to ack.
+     * @param futId ID to ack.
      */
-    private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
-        deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, ver);
+    private void sendDeferredUpdateResponse(UUID nodeId, long futId) {
+        deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, futId);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    private void processNearAtomicDhtResponse(UUID nodeId, GridNearAtomicDhtResponse res) {
+
     }
 
     /**
@@ -3349,18 +3369,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
-        GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+        GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
 
         if (updateFut != null) {
             if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Received DHT atomic update response [futId=" + res.futureVersion() +
+                msgLog.debug("Received DHT atomic update response [futId=" + res.futureId() +
                     ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']');
             }
 
             updateFut.onResult(nodeId, res);
         }
         else {
-            U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureVersion() +
+            U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureId() +
                 ", node=" + nodeId + ", res=" + res + ']');
         }
     }
@@ -3371,19 +3391,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
-        for (GridCacheVersion ver : res.futureVersions()) {
-            GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(ver);
+        for (Long id : res.futureIds()) {
+            GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(id);
 
             if (updateFut != null) {
                 if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Received DHT atomic deferred update response [futId=" + ver +
+                    msgLog.debug("Received DHT atomic deferred update response [futId=" + id +
                         ", writeVer=" + res + ", node=" + nodeId + ']');
                 }
 
                 updateFut.onResult(nodeId);
             }
             else {
-                U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + ver +
+                U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + id +
                     ", nodeId=" + nodeId + ", res=" + res + ']');
             }
         }
@@ -3398,16 +3418,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             ctx.io().send(nodeId, res, ctx.ioPolicy());
 
             if (msgLog.isDebugEnabled())
-                msgLog.debug("Sent near update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']');
+                msgLog.debug("Sent near update response [futId=" + res.futureId() + ", node=" + nodeId + ']');
         }
         catch (ClusterTopologyCheckedException ignored) {
             if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Failed to send near update response [futId=" + res.futureVersion() +
+                msgLog.debug("Failed to send near update response [futId=" + res.futureId() +
                     ", node=" + nodeId + ']');
             }
         }
         catch (IgniteCheckedException e) {
-            U.error(msgLog, "Failed to send near update response [futId=" + res.futureVersion() +
+            U.error(msgLog, "Failed to send near update response [futId=" + res.futureId() +
                 ", node=" + nodeId + ", res=" + res + ']', e);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 923b220..b662476 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -43,7 +43,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
 
     /** ACK future versions. */
     @GridDirectCollection(GridCacheVersion.class)
-    private Collection<GridCacheVersion> futVers;
+    private Collection<Long> futIds;
 
     /** {@inheritDoc} */
     @Override public int lookupIndex() {
@@ -61,14 +61,14 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
      * Constructor.
      *
      * @param cacheId Cache ID.
-     * @param futVers Future versions.
+     * @param futIds Future IDs.
      * @param addDepInfo Deployment info.
      */
-    public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<GridCacheVersion> futVers, boolean addDepInfo) {
-        assert !F.isEmpty(futVers);
+    public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<Long> futIds, boolean addDepInfo) {
+        assert !F.isEmpty(futIds);
 
         this.cacheId = cacheId;
-        this.futVers = futVers;
+        this.futIds = futIds;
         this.addDepInfo = addDepInfo;
     }
 
@@ -78,10 +78,10 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
     }
 
     /**
-     * @return List of ACKed future versions.
+     * @return List of ACKed future ids.
      */
-    public Collection<GridCacheVersion> futureVersions() {
-        return futVers;
+    public Collection<Long> futureIds() {
+        return futIds;
     }
 
     /** {@inheritDoc} */
@@ -105,7 +105,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeCollection("futVers", futVers, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("futIds", futIds, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -127,7 +127,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
 
         switch (reader.state()) {
             case 3:
-                futVers = reader.readCollection("futVers", MessageCollectionItemType.MSG);
+                futIds = reader.readCollection("futIds", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 20d6e90..0c8e482 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -98,7 +98,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     /** {@inheritDoc} */
     @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
         ClusterNode node,
-        GridCacheVersion futVer,
+        long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -110,7 +110,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
             return new GridDhtAtomicSingleUpdateRequest(
                 cctx.cacheId(),
                 node.id(),
-                futVer,
+                futId,
                 writeVer,
                 syncMode,
                 topVer,
@@ -124,7 +124,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
             return new GridDhtAtomicUpdateRequest(
                 cctx.cacheId(),
                 node.id(),
-                futVer,
+                futId,
                 writeVer,
                 syncMode,
                 topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 0af7cf5..127c2be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -52,7 +52,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     private static final int NEAR_FLAG_MASK = 0x80;
 
     /** Future version. */
-    protected GridCacheVersion futVer;
+    protected long futId;
 
     /** Write version. */
     protected GridCacheVersion writeVer;
@@ -102,7 +102,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param writeVer Write version for cache values.
      * @param syncMode Cache write synchronization mode.
      * @param topVer Topology version.
@@ -115,7 +115,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     GridDhtAtomicSingleUpdateRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
+        long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -126,7 +126,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         boolean skipStore
     ) {
         super(cacheId, nodeId);
-        this.futVer = futVer;
+        this.futId = futId;
         this.writeVer = writeVer;
         this.syncMode = syncMode;
         this.topVer = topVer;
@@ -268,8 +268,8 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion futureVersion() {
-        return futVer;
+    @Override public long futureId() {
+        return futId;
     }
 
     /** {@inheritDoc} */
@@ -430,7 +430,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
@@ -520,7 +520,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 4:
-                futVer = reader.readMessage("futVer");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index efb35c4..7cb75fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -123,7 +123,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
 
     /** {@inheritDoc} */
     @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node,
-        GridCacheVersion futVer,
+        long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -134,7 +134,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
         return new GridDhtAtomicUpdateRequest(
             cctx.cacheId(),
             node.id(),
-            futVer,
+            futId,
             writeVer,
             syncMode,
             topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 1854e52..5dfea79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -53,8 +53,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Future version. */
-    private GridCacheVersion futVer;
+    /** Future ID. */
+    private long futId;
 
     /** Write version. */
     private GridCacheVersion writeVer;
@@ -163,7 +163,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param writeVer Write version for cache values.
      * @param invokeArgs Optional arguments for entry processor.
      * @param syncMode Cache write synchronization mode.
@@ -176,7 +176,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     public GridDhtAtomicUpdateRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
+        long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -190,7 +190,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     ) {
         super(cacheId, nodeId);
 
-        this.futVer = futVer;
+        this.futId = futId;
         this.writeVer = writeVer;
         this.syncMode = syncMode;
         this.topVer = topVer;
@@ -362,8 +362,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion futureVersion() {
-        return futVer;
+    @Override public long futureId() {
+        return futId;
     }
 
     /** {@inheritDoc} */
@@ -641,7 +641,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
@@ -805,7 +805,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 8:
-                futVer = reader.readMessage("futVer");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index ff12af0..76d28c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -50,7 +49,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
     public static final int CACHE_MSG_IDX = nextIndexId();
 
     /** Future version. */
-    private GridCacheVersion futVer;
+    private long futId;
 
     /** Failed keys. */
     @GridToStringInclude
@@ -78,12 +77,12 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
     /**
      * @param cacheId Cache ID.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param addDepInfo Deployment info.
      */
-    public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) {
+    public GridDhtAtomicUpdateResponse(int cacheId, long futId, boolean addDepInfo) {
         this.cacheId = cacheId;
-        this.futVer = futVer;
+        this.futId = futId;
         this.addDepInfo = addDepInfo;
     }
 
@@ -95,8 +94,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
     /**
      * @return Future version.
      */
-    public GridCacheVersion futureVersion() {
-        return futVer;
+    public long futureId() {
+        return futId;
     }
 
     /**
@@ -223,7 +222,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
@@ -267,7 +266,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 5:
-                futVer = reader.readMessage("futVer");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 61deeee..2a17813 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -70,7 +70,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
     protected UUID nodeId;
 
     /** Future version. */
-    protected GridCacheVersion futVer;
+    protected long futId;
 
     /** Update version. Set to non-null if fastMap is {@code true}. */
     private GridCacheVersion updateVer;
@@ -109,7 +109,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param fastMap Fast map scheme flag.
      * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
@@ -127,7 +127,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
     protected GridNearAtomicAbstractSingleUpdateRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
+        long futId,
         boolean fastMap,
         @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
@@ -142,11 +142,9 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
         boolean clientReq,
         boolean addDepInfo
     ) {
-        assert futVer != null;
-
         this.cacheId = cacheId;
         this.nodeId = nodeId;
-        this.futVer = futVer;
+        this.futId = futId;
         this.updateVer = updateVer;
         this.topVer = topVer;
         this.syncMode = syncMode;
@@ -199,8 +197,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
     /**
      * @return Future version.
      */
-    @Override public GridCacheVersion futureVersion() {
-        return futVer;
+    @Override public long futureId() {
+        return futId;
     }
 
     /**
@@ -421,7 +419,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
@@ -487,7 +485,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
                 reader.incrementState();
 
             case 4:
-                futVer = reader.readMessage("futVer");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index c92e0f5..42f4bc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -113,9 +113,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     /** Current topology version. */
     protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
 
-    /** */
-    protected GridCacheVersion updVer;
-
     /** Topology version when got mapping error. */
     protected AffinityTopologyVersion mapErrTopVer;
 
@@ -126,7 +123,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     protected CachePartialUpdateCheckedException err;
 
     /** Future ID. */
-    protected GridCacheVersion futVer;
+    protected Long futId;
 
     /** Completion future for a particular topology version. */
     protected GridFutureAdapter<Void> topCompleteFut;
@@ -212,18 +209,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             // Cannot remap.
             remapCnt = 1;
 
-            GridCacheVersion futVer = addAtomicFuture(topVer);
+            Long futId = addAtomicFuture(topVer);
 
-            if (futVer != null)
-                map(topVer, futVer);
+            if (futId != null)
+                map(topVer, futId);
         }
     }
 
     /**
      * @param topVer Topology version.
-     * @param futVer Future version
+     * @param futId Future ID.
      */
-    protected abstract void map(AffinityTopologyVersion topVer, GridCacheVersion futVer);
+    protected abstract void map(AffinityTopologyVersion topVer, Long futId);
 
     /**
      * Maps future on ready topology.
@@ -272,7 +269,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                 if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() +
+                    msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
                         ", writeVer=" + req.updateVersion() +
                         ", node=" + req.nodeId() + ']');
                 }
@@ -282,7 +279,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             }
             catch (IgniteCheckedException e) {
                 if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() +
+                    msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
                         ", writeVer=" + req.updateVersion() +
                         ", node=" + req.nodeId() +
                         ", err=" + e + ']');
@@ -302,6 +299,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      */
     public abstract void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr);
 
+    public abstract void onResult(UUID nodeId, GridNearAtomicDhtResponse res);
+
     /**
      * @param req Request.
      * @param e Error.
@@ -310,7 +309,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         synchronized (mux) {
             GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                 req.nodeId(),
-                req.futureVersion(),
+                req.futureId(),
                 cctx.deploymentEnabled());
 
             res.addFailedKeys(req.keys(), e);
@@ -324,22 +323,22 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * Should be invoked before topology lock released.
      *
      * @param topVer Topology version.
-     * @return Future version in case future added.
+     * @return Future ID in case future added.
      */
-    protected final GridCacheVersion addAtomicFuture(AffinityTopologyVersion topVer) {
-        GridCacheVersion futVer = cctx.versions().next(topVer);
+    protected final Long addAtomicFuture(AffinityTopologyVersion topVer) {
+        Long futId = cctx.mvcc().atomicFutureId();
 
         synchronized (mux) {
-            assert this.futVer == null : this;
+            assert this.futId == null : this;
             assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
             this.topVer = topVer;
-            this.futVer = futVer;
+            this.futId = futId;
         }
 
-        if (storeFuture() && !cctx.mvcc().addAtomicFuture(futVer, this))
+        if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this))
             return null;
 
-        return futVer;
+        return futId;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bee2ecd..077c73c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -61,7 +61,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     /**
      * @return Future version.
      */
-    public abstract GridCacheVersion futureVersion();
+    public abstract long futureId();
 
     /**
      * @return Flag indicating whether this is fast-map udpate.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java
new file mode 100644
index 0000000..fc99a5b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * TODO IGNITE-4705: no need send mapping if it == affinity.
+ */
+public class GridNearAtomicDhtResponse extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** */
+    private static final int HAS_RESULT_MASK = 0x1;
+
+    /** */
+    private static final int RESULT_SUCCESS_MASK = 0x2;
+
+    /** */
+    private long futId;
+
+    /** */
+    @GridDirectCollection(UUID.class)
+    private List<UUID> mapping;
+
+    /** */
+    private byte flags;
+
+    /**
+     *
+     */
+    public GridNearAtomicDhtResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param mapping Update mapping.
+     */
+    public GridNearAtomicDhtResponse(long futId, List<UUID> mapping) {
+        this.futId = futId;
+        this.mapping = mapping;
+    }
+
+    /**
+     * @param success Success flag.
+     */
+    public void setResult(boolean success) {
+        setFlag(true, HAS_RESULT_MASK);
+
+        setFlag(success, RESULT_SUCCESS_MASK);
+    }
+
+    /**
+     * @return Operation result.
+     */
+    public GridCacheReturn result() {
+        assert hasResult();
+
+        return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK));
+    }
+
+    /**
+     * @return {@code True} if response contains operation result.
+     */
+    public boolean hasResult() {
+        return isFlag(HAS_RESULT_MASK);
+    }
+
+    /**
+     * @return Update mapping.
+     */
+    public List<UUID> mapping() {
+        return mapping;
+    }
+
+    /**
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reads flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -45;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 6;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicDhtResponse.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 08c2474..a8f5a55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -66,7 +66,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     private UUID nodeId;
 
     /** Future version. */
-    private GridCacheVersion futVer;
+    private long futId;
 
     /** Update version. Set to non-null if fastMap is {@code true}. */
     private GridCacheVersion updateVer;
@@ -175,7 +175,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param fastMap Fast map scheme flag.
      * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
@@ -197,7 +197,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     GridNearAtomicFullUpdateRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
+        long futId,
         boolean fastMap,
         @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
@@ -216,11 +216,9 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         boolean addDepInfo,
         int maxEntryCnt
     ) {
-        assert futVer != null;
-
         this.cacheId = cacheId;
         this.nodeId = nodeId;
-        this.futVer = futVer;
+        this.futId = futId;
         this.fastMap = fastMap;
         this.updateVer = updateVer;
 
@@ -276,8 +274,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion futureVersion() {
-        return futVer;
+    @Override public long futureId() {
+        return futId;
     }
 
     /** {@inheritDoc} */
@@ -678,7 +676,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
@@ -854,7 +852,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 11:
-                futVer = reader.readMessage("futVer");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index e0c24b2..c474c83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -58,7 +58,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
-     * @param futVer Future version.
+     * @param futId Future ID.
      * @param fastMap Fast map scheme flag.
      * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
@@ -77,7 +77,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
     GridNearAtomicSingleUpdateFilterRequest(
         int cacheId,
         UUID nodeId,
-        GridCacheVersion futVer,
+        long futId,
         boolean fastMap,
         @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
@@ -96,7 +96,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
         super(
             cacheId,
             nodeId,
-            futVer,
+            futId,
             fastMap,
             updateVer,
             topVer,