You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/01/18 17:20:16 UTC

ignite git commit: reimplemented atomic single update to bypass primary on dht updates and send directly to client

Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-balance-master 983c9bd48 -> f28fa9ac5


reimplemented atomic single update to bypass primary on dht updates and send directly to client


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f28fa9ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f28fa9ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f28fa9ac

Branch: refs/heads/ignite-comm-balance-master
Commit: f28fa9ac5b08ad40544cb4e7947c9b2a1ef6a087
Parents: 983c9bd
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed Jan 18 20:20:04 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed Jan 18 20:20:04 2017 +0300

----------------------------------------------------------------------
 .../GridDhtAtomicAbstractUpdateFuture.java      | 14 ++++-
 .../GridDhtAtomicAbstractUpdateRequest.java     |  8 +++
 .../dht/atomic/GridDhtAtomicCache.java          | 22 +++++--
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  6 +-
 .../GridDhtAtomicSingleUpdateRequest.java       | 61 +++++++++++++-----
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  6 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 66 +++++++++++++-------
 .../GridNearAtomicAbstractUpdateFuture.java     |  2 +
 .../GridNearAtomicSingleUpdateFuture.java       |  2 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  4 +-
 10 files changed, 139 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 361fbe2..3ee45ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
@@ -99,6 +100,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /** Response count. */
     private volatile int resCnt;
 
+    public UUID nearNodeId;
+
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -111,10 +114,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
-        GridNearAtomicUpdateResponse updateRes) {
+        GridNearAtomicUpdateResponse updateRes
+    ) {
         this.cctx = cctx;
 
-        futVer = cctx.versions().next(updateReq.topologyVersion());
+        this.futVer = CU.cheatCache(cctx.cacheId()) ? updateReq.futureVersion() :
+            cctx.versions().next(updateReq.topologyVersion());
+
         this.updateReq = updateReq;
         this.completionCb = completionCb;
         this.updateRes = updateRes;
@@ -199,6 +205,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                         conflictExpireTime,
                         conflictVer);
 
+                    updateReq.nearNodeId(nearNodeId);
+
                     mappings.put(nodeId, updateReq);
                 }
 
@@ -450,7 +458,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     clsr.apply(suc);
             }
 
-            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
+            if (updateReq.writeSynchronizationMode() == FULL_SYNC && !CU.cheatCache(cctx.cacheId()))
                 completionCb.apply(updateReq, updateRes);
 
             return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index deb9ce4..21d5c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -76,6 +76,14 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         return nodeId;
     }
 
+    public UUID nearNodeId() {
+        return null;
+    }
+
+    public void nearNodeId(UUID nearNodeId) {
+
+    }
+
     /**
      * @return Keep binary flag.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d52421e..ba0ea89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -680,9 +680,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             true,
             false).get();
 
-        assert res != null;
+        assert res != null || CU.cheatCache(ctx.cacheId());
 
-        return res;
+        return CU.cheatCache(ctx.cacheId()) ? true : res;
     }
 
     /** {@inheritDoc} */
@@ -1905,7 +1905,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (req.writeSynchronizationMode() != FULL_ASYNC)
                             req.cleanup(!node.isLocal());
 
-                        if (dhtFut != null)
+                        if (dhtFut != null && !CU.cheatCache(ctx.cacheId()))
                             ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
                     }
                     else
@@ -2517,6 +2517,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 if (dhtFut != null) {
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
+                        dhtFut.nearNodeId = node.id(); //TODO
+
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
 
                         if (conflictCtx == null)
@@ -3195,7 +3197,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @SuppressWarnings("unchecked")
     private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
         if (msgLog.isDebugEnabled())
-            msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + ", node=" + nodeId + ']');
+            msgLog.debug("Received near atomic update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']');
 
         res.nodeId(ctx.localNodeId());
 
@@ -3319,7 +3321,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         try {
             if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) {
-                ctx.io().send(nodeId, res, ctx.ioPolicy());
+                if (CU.cheatCache(ctx.cacheId())) {
+                    ctx.io().send(
+                        req.nearNodeId(),
+                        new GridNearAtomicUpdateResponse(ctx.cacheId(),
+                            req.nearNodeId(),
+                            req.futureVersion(),
+                            false),
+                        ctx.ioPolicy());
+                }
+                else
+                    ctx.io().send(nodeId, res, ctx.ioPolicy());
 
                 if (msgLog.isDebugEnabled()) {
                     msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 20d6e90..0dc2754 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -67,7 +67,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes
     ) {
-        super(cctx, completionCb, writeVer, updateReq, updateRes);
+        super(cctx,
+            completionCb,
+            writeVer,
+            updateReq,
+            updateRes);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index a7e6c24..53c3079 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -52,6 +52,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     private static final int NEAR_FLAG_MASK = 0x80;
 
     /** Future version. */
+    @GridToStringInclude
     protected GridCacheVersion futVer;
 
     /** Write version. */
@@ -88,8 +89,12 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     protected long updateCntr;
 
     /** */
+    @GridToStringInclude
     protected int partId;
 
+    @GridToStringInclude
+    protected UUID nearNodeId;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -201,6 +206,14 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         this.val = val;
     }
 
+    public UUID nearNodeId() {
+        return nearNodeId;
+    }
+
+    public void nearNodeId(UUID nearNodeId) {
+        this.nearNodeId = nearNodeId;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean forceTransformBackups() {
         return false;
@@ -441,54 +454,60 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeInt("partId", partId))
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeMessage("prevVal", prevVal))
+                if (!writer.writeInt("partId", partId))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeMessage("prevVal", prevVal))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeLong("updateCntr", updateCntr))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeMessage("val", val))
+                if (!writer.writeLong("updateCntr", updateCntr))
                     return false;
 
                 writer.incrementState();
 
             case 14:
+                if (!writer.writeMessage("val", val))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -535,7 +554,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 6:
-                partId = reader.readInt("partId");
+                nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -543,7 +562,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 7:
-                prevVal = reader.readMessage("prevVal");
+                partId = reader.readInt("partId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -551,7 +570,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 8:
-                subjId = reader.readUuid("subjId");
+                prevVal = reader.readMessage("prevVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -559,6 +578,14 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 9:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -570,7 +597,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 10:
+            case 11:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -578,7 +605,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 11:
+            case 12:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -586,7 +613,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 12:
+            case 13:
                 updateCntr = reader.readLong("updateCntr");
 
                 if (!reader.isLastRead())
@@ -594,7 +621,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 13:
+            case 14:
                 val = reader.readMessage("val");
 
                 if (!reader.isLastRead())
@@ -602,7 +629,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 14:
+            case 15:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -652,7 +679,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 16;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index efb35c4..5429adc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -66,7 +66,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes
     ) {
-        super(cctx, completionCb, writeVer, updateReq, updateRes);
+        super(cctx,
+            completionCb,
+            writeVer,
+            updateReq,
+            updateRes);
 
         keys = new ArrayList<>(updateReq.size());
         mappings = U.newHashMap(updateReq.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7144963..c911a99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -151,6 +151,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     /** Additional flags. */
     private byte flags;
 
+    protected UUID nearNodeId;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -292,6 +294,14 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
             conflictExpireTimes.add(conflictExpireTime);
     }
 
+    public UUID nearNodeId() {
+        return nearNodeId;
+    }
+
+    public void nearNodeId(UUID nearNodeId) {
+        this.nearNodeId = nearNodeId;
+    }
+
     /** {@inheritDoc} */
     @Override public void addNearWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
@@ -681,66 +691,72 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeMessage("nearTtls", nearTtls))
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearTtls", nearTtls))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeMessage("ttls", ttls))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeMessage("updateCntrs", updateCntrs))
+                if (!writer.writeMessage("ttls", ttls))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("updateCntrs", updateCntrs))
                     return false;
 
                 writer.incrementState();
 
             case 25:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 26:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -859,7 +875,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 15:
-                nearTtls = reader.readMessage("nearTtls");
+                nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -867,7 +883,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 16:
-                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+                nearTtls = reader.readMessage("nearTtls");
 
                 if (!reader.isLastRead())
                     return false;
@@ -875,7 +891,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 17:
-                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
+                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -883,7 +899,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 18:
-                subjId = reader.readUuid("subjId");
+                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -891,6 +907,14 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 19:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -902,7 +926,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -910,7 +934,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -918,7 +942,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 22:
+            case 23:
                 ttls = reader.readMessage("ttls");
 
                 if (!reader.isLastRead())
@@ -926,7 +950,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 updateCntrs = reader.readMessage("updateCntrs");
 
                 if (!reader.isLastRead())
@@ -934,7 +958,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 24:
+            case 25:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -942,7 +966,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 25:
+            case 26:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -968,7 +992,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 26;
+        return 27;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index a77facc..862e920 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -135,6 +136,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     protected GridCacheReturn opRes;
 
     /** */
+    @GridToStringInclude
     protected volatile Runnable completer;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 06076be..da9cb40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -210,7 +210,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (!res.futureVersion().equals(futVer))
                 return;
 
-            if (!this.req.nodeId().equals(nodeId))
+            if (!this.req.nodeId().equals(nodeId) && !CU.cheatCache(cctx.cacheId()))
                 return;
 
             req = this.req;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 9601ab1..675e631 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1128,8 +1128,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 Boolean clientMode = ignite.configuration().isClientMode();
 
                 usePairedConnections = !clientMode;
-
-                connectionsPerNode = clientMode ? 1 : 2;
             }
         }
     }
@@ -3328,7 +3326,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return {@code True} if given node supports multiple connections per-node for communication.
      */
     private boolean useMultipleConnections(ClusterNode node) {
-        return node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0 && !node.isClient();
+        return node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0;
     }
 
     /**