You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/10 13:55:25 UTC

[1/8] ignite git commit: ignite-4768

Repository: ignite
Updated Branches:
  refs/heads/ignite-4768-1 4b4409189 -> 26316bb06


ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/784b171d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/784b171d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/784b171d

Branch: refs/heads/ignite-4768-1
Commit: 784b171deefbc573e7ec6b59128a274acc307946
Parents: 35dad8f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 9 15:45:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 9 15:49:52 2017 +0300

----------------------------------------------------------------------
 .../cache/distributed/GridDistributedTxPrepareRequest.java  | 9 ---------
 .../cache/distributed/dht/GridDhtTxPrepareFuture.java       | 2 --
 .../cache/distributed/dht/GridDhtTxPrepareRequest.java      | 2 --
 .../cache/distributed/near/GridNearLockFuture.java          | 2 ++
 .../near/GridNearOptimisticSerializableTxPrepareFuture.java | 1 -
 .../distributed/near/GridNearOptimisticTxPrepareFuture.java | 1 -
 .../near/GridNearPessimisticTxPrepareFuture.java            | 9 ---------
 .../cache/distributed/near/GridNearTxPrepareRequest.java    | 2 --
 8 files changed, 2 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index e30c456..329dc8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -74,9 +74,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
     /** */
     private static final int SYSTEM_TX_FLAG_MASK = 0x10;
 
-    /** */
-    private static final int MAPPING_KNOWN_FLAG_MASK = 0x20;
-
     /** Collection to message converter. */
     private static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
         @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) {
@@ -177,7 +174,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         @Nullable Collection<IgniteTxEntry> reads,
         Collection<IgniteTxEntry> writes,
         Map<UUID, Collection<UUID>> txNodes,
-        boolean mappingKnown,
         boolean retVal,
         boolean last,
         boolean onePhaseCommit,
@@ -202,11 +198,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         setFlag(tx.isInvalidate(), INVALIDATE_FLAG_MASK);
         setFlag(onePhaseCommit, ONE_PHASE_COMMIT_FLAG_MASK);
         setFlag(last, LAST_REQ_FLAG_MASK);
-        setFlag(mappingKnown, MAPPING_KNOWN_FLAG_MASK);
-    }
-
-    public final boolean mappingKnown() {
-        return isFlag(MAPPING_KNOWN_FLAG_MASK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 053c3b2..d093b4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1273,7 +1273,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         nearWrites,
                         txNodes,
                         tx.nearXidVersion(),
-                        false,
                         true,
                         tx.onePhaseCommit(),
                         tx.subjectId(),
@@ -1386,7 +1385,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                             nearMapping.writes(),
                             tx.transactionNodes(),
                             tx.nearXidVersion(),
-                            false,
                             true,
                             tx.onePhaseCommit(),
                             tx.subjectId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index e55d189..8c01302 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -128,7 +128,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         Collection<IgniteTxEntry> nearWrites,
         Map<UUID, Collection<UUID>> txNodes,
         GridCacheVersion nearXidVer,
-        boolean mappingKnown,
         boolean last,
         boolean onePhaseCommit,
         UUID subjId,
@@ -140,7 +139,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
             null,
             dhtWrites,
             txNodes,
-            mappingKnown,
             retVal,
             last,
             onePhaseCommit,

http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 7b19884..ffc84d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1181,6 +1181,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             req.filter(filter, cctx);
 
         if (node.isLocal()) {
+            req.miniId(-1);
+
             if (log.isDebugEnabled())
                 log.debug("Before locally locking near request: " + req);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index bb1609d..f8e0584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -442,7 +442,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             m.writes(),
             m.near(),
             txMapping.transactionNodes(),
-            false,
             m.last(),
             tx.onePhaseCommit(),
             tx.needReturnValue() && tx.implicit(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 8bb79c1..4233002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -500,7 +500,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     m.writes(),
                     m.near(),
                     txMapping.transactionNodes(),
-                    false,
                     m.last(),
                     tx.onePhaseCommit(),
                     tx.needReturnValue() && tx.implicit(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index fb2c2fd..ddee7b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -184,8 +184,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
      *
      */
     private void preparePessimistic() {
-        boolean mappingKnown = true;
-
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
 
         AffinityTopologyVersion topVer = tx.topologyVersion();
@@ -202,9 +200,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             if (!cacheCtx.isLocal()) {
                 GridDhtPartitionTopology top = cacheCtx.topology();
 
-                if (mappingKnown && (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer)))
-                    mappingKnown = false;
-
                 nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
             }
             else
@@ -237,9 +232,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         checkOnePhase(txMapping);
 
-        if (mappingKnown && tx.onePhaseCommit())
-            mappingKnown = false;
-
         long timeout = tx.remainingTime();
 
         if (timeout == -1) {
@@ -262,7 +254,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 m.writes(),
                 m.near(),
                 txMapping.transactionNodes(),
-                mappingKnown,
                 true,
                 tx.onePhaseCommit(),
                 tx.needReturnValue() && tx.implicit(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/784b171d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 3eff9e5..cccc7b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -107,7 +107,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         Collection<IgniteTxEntry> writes,
         boolean near,
         Map<UUID, Collection<UUID>> txNodes,
-        boolean mappingKnown,
         boolean last,
         boolean onePhaseCommit,
         boolean retVal,
@@ -123,7 +122,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
             reads,
             writes,
             txNodes,
-            mappingKnown,
             retVal,
             last,
             onePhaseCommit,


[8/8] ignite git commit: ignite-4768

Posted by sb...@apache.org.
ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/26316bb0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/26316bb0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/26316bb0

Branch: refs/heads/ignite-4768-1
Commit: 26316bb06d18569676af91c3d41671ebe3346533
Parents: 7cf4b98
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 13:14:39 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 15:10:12 2017 +0300

----------------------------------------------------------------------
 .../GridDistributedTxFinishRequest.java         | 217 +++++++++++++----
 .../GridDistributedTxFinishResponse.java        |  74 +++++-
 .../GridDistributedTxPrepareResponse.java       |  30 ++-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   9 +-
 .../distributed/dht/GridDhtTxFinishRequest.java | 241 +++++--------------
 .../dht/GridDhtTxFinishResponse.java            |  79 +++---
 .../cache/distributed/dht/GridDhtTxLocal.java   |   9 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   8 +-
 .../dht/GridDhtTxPrepareResponse.java           |  38 +--
 ...arOptimisticSerializableTxPrepareFuture.java |   7 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   4 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   5 +-
 .../near/GridNearTxFinishFuture.java            | 128 ++++++----
 .../near/GridNearTxFinishRequest.java           | 168 ++-----------
 .../near/GridNearTxFinishResponse.java          |  19 +-
 .../near/GridNearTxPrepareFutureAdapter.java    |   4 +-
 .../near/GridNearTxPrepareResponse.java         |  42 ++--
 .../cache/transactions/IgniteTxHandler.java     |  34 +--
 18 files changed, 576 insertions(+), 540 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 3e47cc9..ab9f0ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,18 +20,24 @@ package org.apache.ignite.internal.processors.cache.distributed;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
 /**
  * Transaction completion message.
  */
@@ -39,6 +45,27 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    protected static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
+    /** */
+    protected static final int CHECK_COMMITTED_FLAG_MASK = 0x02;
+
+    /** */
+    protected static final int NEED_RETURN_VALUE_FLAG_MASK = 0x04;
+
+    /** */
+    protected static final int SYS_INVALIDATE_FLAG_MASK = 0x08;
+
+    /** */
+    protected static final int EXPLICIT_LOCK_FLAG_MASK = 0x10;
+
+    /** */
+    protected static final int STORE_ENABLED_FLAG_MASK = 0x20;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
     /** Future ID. */
     private IgniteUuid futId;
 
@@ -54,14 +81,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
     /** Commit flag. */
     private boolean commit;
 
-    /** Sync commit flag. */
-    @Deprecated
-    private boolean syncCommit;
-
-    /** Sync commit flag. */
-    @Deprecated
-    private boolean syncRollback;
-
     /** Min version used as base for completed versions. */
     private GridCacheVersion baseVer;
 
@@ -74,6 +93,18 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
     /** IO policy. */
     private byte plc;
 
+    /** Subject ID. */
+    private UUID subjId;
+
+    /** Task name hash. */
+    private int taskNameHash;
+
+    /** */
+    private byte flags;
+
+    /** Write synchronization mode. */
+    private CacheWriteSynchronizationMode syncMode;
+
     /** Transient TX state. */
     @GridDirectTransient
     private IgniteTxState txState;
@@ -94,8 +125,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
      * @param invalidate Invalidate flag.
      * @param sys System transaction flag.
      * @param plc IO policy.
-     * @param syncCommit Sync commit flag.
-     * @param syncRollback Sync rollback flag.
+     * @param syncMode Write synchronization mode.
      * @param baseVer Base version.
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
@@ -105,39 +135,93 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
     public GridDistributedTxFinishRequest(
         GridCacheVersion xidVer,
         IgniteUuid futId,
+        @NotNull AffinityTopologyVersion topVer,
         @Nullable GridCacheVersion commitVer,
         long threadId,
         boolean commit,
         boolean invalidate,
         boolean sys,
         byte plc,
-        boolean syncCommit,
-        boolean syncRollback,
+        CacheWriteSynchronizationMode syncMode,
         GridCacheVersion baseVer,
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers,
+        @Nullable UUID subjId,
+        int taskNameHash,
         int txSize,
         boolean addDepInfo
     ) {
         super(xidVer, 0, addDepInfo);
+
         assert xidVer != null;
+        assert syncMode != null;
 
         this.futId = futId;
+        this.topVer = topVer;
         this.commitVer = commitVer;
         this.threadId = threadId;
         this.commit = commit;
         this.invalidate = invalidate;
         this.sys = sys;
         this.plc = plc;
-        this.syncCommit = syncCommit;
-        this.syncRollback = syncRollback;
+        this.syncMode = syncMode;
         this.baseVer = baseVer;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
         this.txSize = txSize;
 
         completedVersions(committedVers, rolledbackVers);
     }
 
     /**
+     * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+     */
+    public final CacheWriteSynchronizationMode syncMode() {
+        return syncMode;
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    protected final void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    protected final boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    /**
+     * @return Subject ID.
+     */
+    @Nullable public final UUID subjectId() {
+        return subjId;
+    }
+
+    /**
+     * @return Task name hash.
+     */
+    public final int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    @Override public final AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
      * @return System transaction flag.
      */
     public boolean system() {
@@ -188,27 +272,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
     }
 
     /**
-     * @return Sync commit flag.
-     */
-    public boolean syncCommit() {
-        return syncCommit;
-    }
-
-    /**
-     * @param syncCommit Sync commit flag.
-     */
-    public void syncCommit(boolean syncCommit) {
-        this.syncCommit = syncCommit;
-    }
-
-    /**
-     * @return Sync rollback flag.
-     */
-    public boolean syncRollback() {
-        return syncRollback;
-    }
-
-    /**
      * @return Base version.
      */
     public GridCacheVersion baseVersion() {
@@ -227,7 +290,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
      * @return {@code True} if reply is required.
      */
     public boolean replyRequired() {
-        return commit ? syncCommit : syncRollback;
+        assert syncMode != null;
+
+        return syncMode == FULL_SYNC;
     }
 
     /** {@inheritDoc} */
@@ -279,48 +344,66 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("invalidate", invalidate))
+                if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeByte("plc", plc))
+                if (!writer.writeBoolean("invalidate", invalidate))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeBoolean("syncCommit", syncCommit))
+                if (!writer.writeByte("plc", plc))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeBoolean("syncRollback", syncRollback))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeBoolean("sys", sys))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeLong("threadId", threadId))
+                if (!writer.writeBoolean("sys", sys))
                     return false;
 
                 writer.incrementState();
 
             case 17:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 18:
+                if (!writer.writeLong("threadId", threadId))
+                    return false;
+
+                writer.incrementState();
+
+            case 19:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 20:
                 if (!writer.writeInt("txSize", txSize))
                     return false;
 
@@ -367,7 +450,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
                 reader.incrementState();
 
             case 10:
-                futId = reader.readIgniteUuid("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -375,7 +458,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
                 reader.incrementState();
 
             case 11:
-                invalidate = reader.readBoolean("invalidate");
+                futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -383,7 +466,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
                 reader.incrementState();
 
             case 12:
-                plc = reader.readByte("plc");
+                invalidate = reader.readBoolean("invalidate");
 
                 if (!reader.isLastRead())
                     return false;
@@ -391,7 +474,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
                 reader.incrementState();
 
             case 13:
-                syncCommit = reader.readBoolean("syncCommit");
+                plc = reader.readByte("plc");
 
                 if (!reader.isLastRead())
                     return false;
@@ -399,7 +482,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
                 reader.incrementState();
 
             case 14:
-                syncRollback = reader.readBoolean("syncRollback");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -407,15 +490,19 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
                 reader.incrementState();
 
             case 15:
-                sys = reader.readBoolean("sys");
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
 
                 if (!reader.isLastRead())
                     return false;
 
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
                 reader.incrementState();
 
             case 16:
-                threadId = reader.readLong("threadId");
+                sys = reader.readBoolean("sys");
 
                 if (!reader.isLastRead())
                     return false;
@@ -423,6 +510,30 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
                 reader.incrementState();
 
             case 17:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 18:
+                threadId = reader.readLong("threadId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 19:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
                 txSize = reader.readInt("txSize");
 
                 if (!reader.isLastRead())
@@ -442,7 +553,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 18;
+        return 21;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 109d665..1f61033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.cache.distributed;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -41,25 +43,59 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
     /** Future ID. */
     private IgniteUuid futId;
 
+    /** */
+    @GridToStringExclude
+    private byte flags;
+
+    /** */
+    private int part;
+
     /**
-     * Empty constructor required by {@link Externalizable}.
+     * Empty constructor required by {@link GridIoMessageFactory}.
      */
     public GridDistributedTxFinishResponse() {
         /* No-op. */
     }
 
     /**
+     * @param part Partition.
      * @param txId Transaction id.
      * @param futId Future ID.
      */
-    public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) {
+    public GridDistributedTxFinishResponse(int part, GridCacheVersion txId, IgniteUuid futId) {
         assert txId != null;
         assert futId != null;
 
+        this.part = part;
         this.txId = txId;
         this.futId = futId;
     }
 
+    /** {@inheritDoc} */
+    @Override public final int partition() {
+        return part;
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    protected final void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    protected final boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
     /**
      *
      * @return Transaction id.
@@ -101,12 +137,24 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeMessage("txId", txId))
                     return false;
 
@@ -129,7 +177,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
         switch (reader.state()) {
             case 3:
-                futId = reader.readIgniteUuid("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -137,6 +185,22 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 txId = reader.readMessage("txId");
 
                 if (!reader.isLastRead())
@@ -156,7 +220,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 7;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index c19b8c1..53a1391 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -184,6 +184,18 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
 
                 writer.incrementState();
 
+            case 8:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -208,6 +220,22 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
+            case 8:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDistributedTxPrepareResponse.class);
@@ -220,7 +248,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 5d4e610..7011996 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -325,8 +325,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.system(),
                 tx.ioPolicy(),
                 tx.isSystemInvalidate(),
-                sync,
-                sync,
+                sync ? FULL_SYNC : tx.syncMode(),
                 tx.completedBase(),
                 tx.committedVersions(),
                 tx.rolledbackVersions(),
@@ -426,8 +425,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.system(),
                 tx.ioPolicy(),
                 tx.isSystemInvalidate(),
-                sync,
-                sync,
+                sync ? FULL_SYNC : tx.syncMode(),
                 tx.completedBase(),
                 tx.committedVersions(),
                 tx.rolledbackVersions(),
@@ -497,8 +495,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                     tx.system(),
                     tx.ioPolicy(),
                     tx.isSystemInvalidate(),
-                    sync,
-                    sync,
+                    sync ? FULL_SYNC : tx.syncMode(),
                     tx.completedBase(),
                     tx.committedVersions(),
                     tx.rolledbackVersions(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index c618a18..7194848 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
@@ -43,12 +44,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** */
-    public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
-
-    /** */
-    public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02;
-
     /** Near node ID. */
     private UUID nearNodeId;
 
@@ -58,20 +53,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** Mini future ID. */
     private IgniteUuid miniId;
 
-    /** System invalidation flag. */
-    private boolean sysInvalidate;
-
-    /** Topology version. */
-    private AffinityTopologyVersion topVer;
-
     /** Pending versions with order less than one for this message (needed for commit ordering). */
     @GridToStringInclude
     @GridDirectCollection(GridCacheVersion.class)
     private Collection<GridCacheVersion> pendingVers;
 
-    /** Check committed flag. */
-    private boolean checkCommitted;
-
     /** Partition update counter. */
     @GridToStringInclude
     @GridDirectCollection(Long.class)
@@ -80,15 +66,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** One phase commit write version. */
     private GridCacheVersion writeVer;
 
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name hash. */
-    private int taskNameHash;
-
-    /** */
-    private byte flags;
-
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -110,8 +87,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param sys System flag.
      * @param plc IO policy.
      * @param sysInvalidate System invalidation flag.
-     * @param syncCommit Synchronous commit flag.
-     * @param syncRollback Synchronous rollback flag.
+     * @param syncMode Write synchronization mode.
      * @param baseVer Base version.
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
@@ -135,8 +111,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         boolean sys,
         byte plc,
         boolean sysInvalidate,
-        boolean syncCommit,
-        boolean syncRollback,
+        CacheWriteSynchronizationMode syncMode,
         GridCacheVersion baseVer,
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers,
@@ -151,17 +126,19 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         super(
             xidVer,
             futId,
+            topVer,
             commitVer,
             threadId,
             commit,
             invalidate,
             sys,
             plc,
-            syncCommit,
-            syncRollback,
+            syncMode,
             baseVer,
             committedVers,
             rolledbackVers,
+            subjId,
+            taskNameHash,
             txSize,
             addDepInfo);
 
@@ -170,16 +147,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         assert isolation != null;
 
         this.pendingVers = pendingVers;
-        this.topVer = topVer;
         this.nearNodeId = nearNodeId;
         this.isolation = isolation;
         this.miniId = miniId;
-        this.sysInvalidate = sysInvalidate;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
 
         needReturnValue(retVal);
         waitRemoteTransactions(waitRemoteTxs);
+        systemInvalidate(sysInvalidate);
     }
 
     /**
@@ -196,8 +170,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param sys System flag.
      * @param plc IO policy.
      * @param sysInvalidate System invalidation flag.
-     * @param syncCommit Synchronous commit flag.
-     * @param syncRollback Synchronous rollback flag.
+     * @param syncMode Write synchronization mode.
      * @param baseVer Base version.
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
@@ -222,8 +195,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         boolean sys,
         byte plc,
         boolean sysInvalidate,
-        boolean syncCommit,
-        boolean syncRollback,
+        CacheWriteSynchronizationMode syncMode,
         GridCacheVersion baseVer,
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers,
@@ -236,9 +208,30 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         boolean retVal,
         boolean waitRemoteTxs
     ) {
-        this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
-            sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
-            subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs);
+        this(nearNodeId,
+            futId,
+            miniId,
+            topVer,
+            xidVer,
+            commitVer,
+            threadId,
+            isolation,
+            commit,
+            invalidate,
+            sys,
+            plc,
+            sysInvalidate,
+            syncMode,
+            baseVer,
+            committedVers,
+            rolledbackVers,
+            pendingVers,
+            txSize,
+            subjId,
+            taskNameHash,
+            addDepInfo,
+            retVal,
+            waitRemoteTxs);
 
         if (updateIdxs != null && !updateIdxs.isEmpty()) {
             partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -263,20 +256,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     }
 
     /**
-     * @return Subject ID.
-     */
-    @Nullable public UUID subjectId() {
-        return subjId;
-    }
-
-    /**
-     * @return Task name hash.
-     */
-    public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /**
      * @return Transaction isolation.
      */
     public TransactionIsolation isolation() {
@@ -294,7 +273,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @return System invalidate flag.
      */
     public boolean isSystemInvalidate() {
-        return sysInvalidate;
+        return isFlag(SYS_INVALIDATE_FLAG_MASK);
+    }
+
+    /**
+     * @param sysInvalidate System invalidation flag.
+     */
+    private void systemInvalidate(boolean sysInvalidate) {
+        setFlag(sysInvalidate, SYS_INVALIDATE_FLAG_MASK);
     }
 
     /**
@@ -312,63 +298,45 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     }
 
     /**
-     * @return Topology version.
-     */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /**
      * @return Check committed flag.
      */
     public boolean checkCommitted() {
-        return checkCommitted;
+        return isFlag(CHECK_COMMITTED_FLAG_MASK);
     }
 
     /**
      * @param checkCommitted Check committed flag.
      */
     public void checkCommitted(boolean checkCommitted) {
-        this.checkCommitted = checkCommitted;
+        setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
     }
 
     /**
      * @return {@code True}
      */
     public boolean waitRemoteTransactions() {
-        return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
+        return isFlag(WAIT_REMOTE_TX_FLAG_MASK);
     }
 
     /**
      * @param waitRemoteTxs Wait remote transactions flag.
      */
-    public void waitRemoteTransactions(boolean waitRemoteTxs) {
-        if (waitRemoteTxs)
-            flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK);
-        else
-            flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
+    private void waitRemoteTransactions(boolean waitRemoteTxs) {
+        setFlag(waitRemoteTxs, WAIT_REMOTE_TX_FLAG_MASK);
     }
 
     /**
      * @return Flag indicating whether transaction needs return value.
      */
     public boolean needReturnValue() {
-        return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+        return isFlag(NEED_RETURN_VALUE_FLAG_MASK);
     }
 
     /**
      * @param retVal Need return value.
      */
     public void needReturnValue(boolean retVal) {
-        if (retVal)
-            flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
-        else
-            flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
+        setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
     }
 
     /** {@inheritDoc} */
@@ -386,73 +354,37 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         }
 
         switch (writer.state()) {
-            case 18:
-                if (!writer.writeBoolean("checkCommitted", checkCommitted))
-                    return false;
-
-                writer.incrementState();
-
-            case 19:
-                if (!writer.writeByte("flags", flags))
-                    return false;
-
-                writer.incrementState();
-
-            case 20:
-                if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
             case 21:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
-                    return false;
-
-                writer.incrementState();
-
-            case 27:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
-                    return false;
-
-                writer.incrementState();
-
-            case 28:
-                if (!writer.writeMessage("topVer", topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 29:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -474,23 +406,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
             return false;
 
         switch (reader.state()) {
-            case 18:
-                checkCommitted = reader.readBoolean("checkCommitted");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 19:
-                flags = reader.readByte("flags");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 20:
+            case 21:
                 byte isolationOrd;
 
                 isolationOrd = reader.readByte("isolation");
@@ -502,16 +418,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 21:
-                miniId = reader.readIgniteUuid("miniId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 22:
-                nearNodeId = reader.readUuid("nearNodeId");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -519,7 +427,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 23:
-                partUpdateCnt = reader.readMessage("partUpdateCnt");
+                nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -527,7 +435,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 24:
-                pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+                partUpdateCnt = reader.readMessage("partUpdateCnt");
 
                 if (!reader.isLastRead())
                     return false;
@@ -535,7 +443,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 25:
-                subjId = reader.readUuid("subjId");
+                pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -543,30 +451,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 26:
-                sysInvalidate = reader.readBoolean("sysInvalidate");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 27:
-                taskNameHash = reader.readInt("taskNameHash");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 28:
-                topVer = reader.readMessage("topVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 29:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -586,6 +470,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 30;
+        return 27;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 8fb1f4e..4808289 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -39,6 +39,12 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final int NEAR_RES_FLAG_MASK = 0x01;
+
+    /** Flag indicating if this is a check-committed response. */
+    private static final int CHECK_COMMITTED_FLAG_MASK = 0x02;
+
     /** Mini future ID. */
     private IgniteUuid miniId;
 
@@ -49,9 +55,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     /** Serialized error. */
     private byte[] checkCommittedErrBytes;
 
-    /** Flag indicating if this is a check-committed response. */
-    private boolean checkCommitted;
-
     /** Cache return value. */
     private GridCacheReturn retVal;
 
@@ -63,12 +66,13 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     }
 
     /**
+     * @param part Partition.
      * @param xid Xid version.
      * @param futId Future ID.
      * @param miniId Mini future ID.
      */
-    public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) {
-        super(xid, futId);
+    public GridDhtTxFinishResponse(int part, GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) {
+        super(part, xid, futId);
 
         assert miniId != null;
 
@@ -76,6 +80,20 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     }
 
     /**
+     * @return {@code True} if this is reply for near node.
+     */
+    public boolean nearNodeResponse() {
+        return isFlag(NEAR_RES_FLAG_MASK);
+    }
+
+    /**
+     * @param val {@code True} if this is reply for near node.
+     */
+    public void nearNodeResponse(boolean val) {
+        setFlag(val, NEAR_RES_FLAG_MASK);
+    }
+
+    /**
      * @return Mini future ID.
      */
     public IgniteUuid miniId() {
@@ -100,14 +118,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
      * @return Check committed flag.
      */
     public boolean checkCommitted() {
-        return checkCommitted;
+        return isFlag(CHECK_COMMITTED_FLAG_MASK);
     }
 
     /**
      * @param checkCommitted Check committed flag.
      */
     public void checkCommitted(boolean checkCommitted) {
-        this.checkCommitted = checkCommitted;
+        setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
     }
 
     /** {@inheritDoc} */
@@ -158,11 +176,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxFinishResponse.class, this, super.toString());
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
@@ -177,25 +190,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
         }
 
         switch (writer.state()) {
-            case 5:
-                if (!writer.writeBoolean("checkCommitted", checkCommitted))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
+            case 7:
                 if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 8:
                 if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 9:
                 if (!writer.writeMessage("retVal", retVal))
                     return false;
 
@@ -217,15 +224,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
             return false;
 
         switch (reader.state()) {
-            case 5:
-                checkCommitted = reader.readBoolean("checkCommitted");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
+            case 7:
                 checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
 
                 if (!reader.isLastRead())
@@ -233,7 +232,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 7:
+            case 8:
                 miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
@@ -241,7 +240,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 8:
+            case 9:
                 retVal = reader.readMessage("retVal");
 
                 if (!reader.isLastRead())
@@ -261,6 +260,20 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 10;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        StringBuilder flags = new StringBuilder();
+
+        if (checkCommitted())
+            appendFlag(flags, "checkComm");
+        if (nearNodeResponse())
+            appendFlag(flags, "nearRes");
+
+        return S.toString(GridDhtTxFinishResponse.class, this,
+            "flags", flags.toString(),
+            "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 79c371c..c2c2918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -641,8 +641,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 return;
             }
 
-            GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer, threadId, nearFinFutId,
-                nearFinMiniId, err);
+            GridNearTxFinishResponse res = new GridNearTxFinishResponse(
+                -1,
+                nearXidVer,
+                threadId,
+                nearFinFutId,
+                nearFinMiniId,
+                err);
 
             try {
                 cctx.io().send(nearNodeId, res, ioPolicy());

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 7645d58..ca028f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1252,8 +1252,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             if (last) {
                 int miniId = 0;
 
-                assert tx.transactionNodes() != null;
-
                 final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
 
                 // Create mini futures.
@@ -1288,7 +1286,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                     GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
                         futId,
-                        fut != null ? fut.futureId() : null,
+                        fut != null ? fut.futureId() : 0,
                         nearFutId,
                         nearMiniId,
                         tx.topologyVersion(),
@@ -1315,8 +1313,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                             GridCacheContext<?, ?> cacheCtx = cached.context();
 
                             // Do not invalidate near entry on originating transaction node.
-                            req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
-                                cached.readerId(n.id()) != null);
+                            req.invalidateNearEntry(idx,
+                                !tx.nearNodeId().equals(n.id()) && cached.readerId(n.id()) != null);
 
                             if (cached.isNewLocked()) {
                                 List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index e7153da..2fd72f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -247,11 +247,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxPrepareResponse.class, this, "super", super.toString());
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
@@ -266,31 +261,31 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
         }
 
         switch (writer.state()) {
-            case 8:
+            case 10:
                 if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 11:
                 if (!writer.writeMap("invalidParts", invalidParts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 12:
                 if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 11:
+            case 13:
                 if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 12:
+            case 14:
                 if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
                     return false;
 
@@ -312,7 +307,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
             return false;
 
         switch (reader.state()) {
-            case 8:
+            case 10:
                 futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
@@ -320,7 +315,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
 
                 reader.incrementState();
 
-            case 9:
+            case 11:
                 invalidParts = reader.readMap("invalidParts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
 
                 if (!reader.isLastRead())
@@ -328,7 +323,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
 
                 reader.incrementState();
 
-            case 10:
+            case 12:
                 miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
@@ -336,7 +331,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
 
                 reader.incrementState();
 
-            case 11:
+            case 13:
                 nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -344,7 +339,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
 
                 reader.incrementState();
 
-            case 12:
+            case 14:
                 preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -364,6 +359,17 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 15;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        StringBuilder flags = new StringBuilder();
+
+        if (nearNodeResponse())
+            appendFlag(flags, "nearRes");
+
+        return S.toString(GridDhtTxPrepareResponse.class, this,
+            "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 652eec2..cd0e7fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -195,7 +195,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /** {@inheritDoc} */
-    @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+    @Override public void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
             MiniFuture mini = miniFuture(res.miniId());
 
@@ -885,7 +885,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                             onDone(res);
                     }
                     else {
-                        parent.onPrimaryPrepareResponse(m, res);
+                        parent.processPrimaryPrepareResponse(m, res);
 
                         // Finish this mini future (need result only on client node).
                         onDone(parent.cctx.kernalContext().clientNode() ? res : null);
@@ -899,8 +899,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
          */
         private void remap(final GridNearTxPrepareResponse res) {
             parent.prepareOnTopology(true, new Runnable() {
-                @Override
-                public void run() {
+                @Override public void run() {
                     onDone(res);
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 5cfb1a2..d321cb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -174,7 +174,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     }
 
     /** {@inheritDoc} */
-    @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+    @Override public void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
             MiniFuture mini = miniFuture(res.miniId());
 
@@ -918,7 +918,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                             remap();
                     }
                     else {
-                        parent.onPrimaryPrepareResponse(m, res);
+                        parent.processPrimaryPrepareResponse(m, res);
 
                         // Proceed prepare before finishing mini future.
                         if (mappings != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index b476336..03ee7af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -95,7 +95,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     }
 
     /** {@inheritDoc} */
-    @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+    @Override public void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
             assert res.clientRemapVersion() == null : res;
 
@@ -205,7 +205,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
      */
     private void preparePessimistic() {
         // TODO IGNITE-4768: need detect on lock step?
-
         boolean dhtReplyNear = true;
 
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
@@ -473,7 +472,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             else {
                 assert dhtNodes == null;
 
-                onPrimaryPrepareResponse(m, res);
+                processPrimaryPrepareResponse(m, res);
 
                 onDone();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 06cf878..1d52b6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -148,7 +148,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             if (isMini(fut)) {
                 MinFuture f = (MinFuture)fut;
 
-                if (f.onNodeLeft(nodeId, true)) {
+                if (f.onNodeLeft(nodeId)) {
                     // Remove previous mapping.
                     mappings.remove(nodeId);
 
@@ -184,7 +184,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      * @param res Result.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
+    public void onPrimaryResponse(UUID nodeId, GridNearTxFinishResponse res) {
         if (!isDone()) {
             FinishMiniFuture finishFut = null;
 
@@ -209,7 +209,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             }
 
             if (finishFut != null)
-                finishFut.onNearFinishResponse(res);
+                finishFut.onPrimaryResponse(res);
             else {
                 if (msgLog.isDebugEnabled()) {
                     msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() +
@@ -233,34 +233,64 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      * @param nodeId Sender.
      * @param res Result.
      */
-    public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
+    public void onDhtResponse(UUID nodeId, GridDhtTxFinishResponse res) {
+        assert res.checkCommitted() ^ res.nearNodeResponse() : res;
+
         if (!isDone()) {
-            boolean found = false;
+            MinFuture foundFut = null;
+
+            synchronized (sync) {
+                int size = futuresCountNoLock();
 
-            for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
-                if (fut.getClass() == CheckBackupMiniFuture.class) {
-                    CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
+                for (int i = 0; i < size; i++) {
+                    IgniteInternalFuture<IgniteInternalTx> fut = future(i);
 
-                    if (f.futureId().equals(res.miniId())) {
-                        found = true;
+                    if (res.nearNodeResponse()) {
+                        assert tx.dhtReplyNear() && tx.syncMode() == FULL_SYNC;
 
-                        assert f.node().id().equals(nodeId);
+                        if (fut.getClass() == FinishMiniFuture.class) {
+                            FinishMiniFuture f = (FinishMiniFuture)fut;
 
-                        if (res.returnValue() != null)
-                            tx.implicitSingleResult(res.returnValue());
+                            if (f.futureId().equals(res.miniId())) {
+                                foundFut = (MinFuture)fut;
 
-                        f.onDhtFinishResponse(res);
+                                break;
+                            }
+                        }
                     }
-                }
-                else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
-                    CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+                    else {
+                        assert res.checkCommitted();
+
+                        if (fut.getClass() == CheckBackupMiniFuture.class) {
+                            CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
+
+                            if (f.futureId().equals(res.miniId())) {
+                                assert f.node().id().equals(nodeId);
 
-                    if (f.futureId().equals(res.miniId()))
-                        f.onDhtFinishResponse(nodeId);
+                                foundFut = f;
+
+                                if (res.returnValue() != null)
+                                    tx.implicitSingleResult(res.returnValue());
+
+                                break;
+                            }
+                        }
+                        else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
+                            CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+
+                            if (f.futureId().equals(res.miniId())) {
+                                foundFut = f;
+
+                                break;
+                            }
+                        }
+                    }
                 }
             }
 
-            if (!found && msgLog.isDebugEnabled()) {
+            if (foundFut != null)
+                foundFut.onDhtFinishResponse(nodeId, res);
+            else if (msgLog.isDebugEnabled()) {
                 msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() +
                     ", node=" + nodeId +
                     ", res=" + res +
@@ -571,7 +601,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                             }
                         }
                         catch (ClusterTopologyCheckedException ignored) {
-                            mini.onNodeLeft(backupId, false);
+                            mini.onNodeLeft(backupId);
                         }
                         catch (IgniteCheckedException e) {
                             if (msgLog.isDebugEnabled()) {
@@ -731,7 +761,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 // Remove previous mapping.
                 mappings.remove(m.primary().id());
 
-                fut.onNodeLeft(n.id(), false);
+                fut.onNodeLeft(n.id());
             }
             catch (IgniteCheckedException e) {
                 if (msgLog.isDebugEnabled()) {
@@ -813,8 +843,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             tx.system(),
             tx.ioPolicy(),
             false,
-            tx.syncMode() == FULL_SYNC,
-            tx.syncMode() == FULL_SYNC,
+            tx.syncMode(),
             null,
             null,
             null,
@@ -840,10 +869,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
         /**
          * @param nodeId Node ID.
-         * @param discoThread {@code True} if executed from discovery thread.
          * @return {@code True} if future processed node failure.
          */
-        abstract boolean onNodeLeft(UUID nodeId, boolean discoThread);
+        abstract boolean onNodeLeft(UUID nodeId);
+
+        /**
+         * @param nodeId Node ID.
+         * @param res Response.
+         */
+        abstract void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res);
 
         /**
          * @return Future ID.
@@ -890,7 +924,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         }
 
         /** {@inheritDoc} */
-        boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+        boolean onNodeLeft(UUID nodeId) {
             if (nodeId.equals(m.primary().id())) {
                 if (msgLog.isDebugEnabled()) {
                     msgLog.debug("Near finish fut, mini future node left [txId=" + tx.nearXidVersion() +
@@ -919,7 +953,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                                         fut.listen(new CI1<IgniteInternalFuture<?>>() {
                                             @Override public void apply(IgniteInternalFuture<?> fut) {
-                                                mini.onDhtFinishResponse(cctx.localNodeId());
+                                                mini.onDhtFinishResponse(cctx.localNodeId(), null);
                                             }
                                         });
                                     }
@@ -928,7 +962,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                                             cctx.io().send(backup, req, tx.ioPolicy());
                                         }
                                         catch (ClusterTopologyCheckedException ignored) {
-                                            mini.onNodeLeft(backupId, discoThread);
+                                            mini.onNodeLeft(backupId);
                                         }
                                         catch (IgniteCheckedException e) {
                                             mini.onDone(e);
@@ -936,7 +970,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                                     }
                                 }
                                 else
-                                    mini.onDhtFinishResponse(backupId);
+                                    mini.onDhtFinishResponse(backupId, null);
                             }
                         }
                     }
@@ -950,10 +984,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             return false;
         }
 
+        /** {@inheritDoc} */
+        @Override void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
+            assert dhtNodes != null;
+
+            boolean done;
+
+            synchronized (dhtNodes) {
+                done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty();
+            }
+
+            if (done)
+                onDone(tx);
+        }
+
         /**
-         * @param res Result callback.
+         * @param res Response.
          */
-        void onNearFinishResponse(GridNearTxFinishResponse res) {
+        void onPrimaryResponse(GridNearTxFinishResponse res) {
             assert dhtNodes == null || res.error() != null;
 
             if (res.error() != null)
@@ -999,7 +1047,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         }
 
         /** {@inheritDoc} */
-        @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+        @Override boolean onNodeLeft(UUID nodeId) {
             if (nodeId.equals(backup.id())) {
                 readyNearMappingFromBackup(m);
 
@@ -1011,10 +1059,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             return false;
         }
 
-        /**
-         * @param res Response.
-         */
-        void onDhtFinishResponse(GridDhtTxFinishResponse res) {
+        /** {@inheritDoc} */
+        @Override void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
             readyNearMappingFromBackup(m);
 
             Throwable err = res.checkCommittedError();
@@ -1060,14 +1106,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         }
 
         /** {@inheritDoc} */
-        @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+        @Override boolean onNodeLeft(UUID nodeId) {
             return onResponse(nodeId);
         }
 
-        /**
-         * @param nodeId Node ID.
-         */
-        void onDhtFinishResponse(UUID nodeId) {
+        /** {@inheritDoc} */
+        @Override void onDhtFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
             onResponse(nodeId);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index dfbbe18..8be4304 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -42,24 +42,6 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
     /** Mini future ID. */
     private IgniteUuid miniId;
 
-    /** Explicit lock flag. */
-    private boolean explicitLock;
-
-    /** Store enabled flag. */
-    private boolean storeEnabled;
-
-    /** Topology version. */
-    private AffinityTopologyVersion topVer;
-
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name hash. */
-    private int taskNameHash;
-
-    /** Write synchronization mode. */
-    private CacheWriteSynchronizationMode syncMode;
-
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -109,48 +91,53 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
         super(
             xidVer,
             futId,
+            topVer,
             null,
             threadId,
             commit,
             invalidate,
             sys,
             plc,
-            syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
-            syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
+            syncMode,
             baseVer,
             committedVers,
             rolledbackVers,
+            subjId,
+            taskNameHash,
             txSize,
             addDepInfo
         );
 
-        this.syncMode = syncMode;
-        this.explicitLock = explicitLock;
-        this.storeEnabled = storeEnabled;
-        this.topVer = topVer;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
+        explicitLock(explicitLock);
+        storeEnabled(storeEnabled);
     }
 
     /**
-     * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+     * @return Explicit lock flag.
      */
-    @Nullable public CacheWriteSynchronizationMode syncMode() {
-        return syncMode;
+    public boolean explicitLock() {
+        return isFlag(EXPLICIT_LOCK_FLAG_MASK);
     }
 
     /**
-     * @return Explicit lock flag.
+     * @param explicitLock Explicit lock flag.
      */
-    public boolean explicitLock() {
-        return explicitLock;
+    private void explicitLock(boolean explicitLock) {
+        setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
     }
 
     /**
      * @return Store enabled flag.
      */
     public boolean storeEnabled() {
-        return storeEnabled;
+        return isFlag(STORE_ENABLED_FLAG_MASK);
+    }
+
+    /**
+     * @param storeEnabled Store enabled flag.
+     */
+    private void storeEnabled(boolean storeEnabled) {
+        setFlag(storeEnabled, STORE_ENABLED_FLAG_MASK);
     }
 
     /**
@@ -167,27 +154,6 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
         this.miniId = miniId;
     }
 
-    /**
-     * @return Subject ID.
-     */
-    @Nullable public UUID subjectId() {
-        return subjId;
-    }
-
-    /**
-     * @return Task name hash.
-     */
-    public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -203,44 +169,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
         }
 
         switch (writer.state()) {
-            case 18:
-                if (!writer.writeBoolean("explicitLock", explicitLock))
-                    return false;
-
-                writer.incrementState();
-
-            case 19:
-                if (!writer.writeIgniteUuid("miniId", miniId))
-                    return false;
-
-                writer.incrementState();
-
-            case 20:
-                if (!writer.writeBoolean("storeEnabled", storeEnabled))
-                    return false;
-
-                writer.incrementState();
-
             case 21:
-                if (!writer.writeUuid("subjId", subjId))
-                    return false;
-
-                writer.incrementState();
-
-            case 22:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 23:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
-                    return false;
-
-                writer.incrementState();
-
-            case 24:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
@@ -261,60 +191,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
             return false;
 
         switch (reader.state()) {
-            case 18:
-                explicitLock = reader.readBoolean("explicitLock");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 19:
-                miniId = reader.readIgniteUuid("miniId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 20:
-                storeEnabled = reader.readBoolean("storeEnabled");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 21:
-                subjId = reader.readUuid("subjId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 22:
-                byte syncModeOrd;
-
-                syncModeOrd = reader.readByte("syncMode");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
-                reader.incrementState();
-
-            case 23:
-                taskNameHash = reader.readInt("taskNameHash");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 24:
-                topVer = reader.readMessage("topVer");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -333,7 +211,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 25;
+        return 22;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 37fbb36..cf1ba46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -59,15 +59,16 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
     }
 
     /**
+     * @param part Partition.
      * @param xid Xid version.
      * @param nearThreadId Near tx thread ID.
      * @param futId Future ID.
      * @param miniId Mini future Id.
      * @param err Error.
      */
-    public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, IgniteUuid futId, IgniteUuid miniId,
+    public GridNearTxFinishResponse(int part, GridCacheVersion xid, long nearThreadId, IgniteUuid futId, IgniteUuid miniId,
         @Nullable Throwable err) {
-        super(xid, futId);
+        super(part, xid, futId);
 
         assert miniId != null;
 
@@ -127,19 +128,19 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
         }
 
         switch (writer.state()) {
-            case 5:
+            case 7:
                 if (!writer.writeByteArray("errBytes", errBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 8:
                 if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 9:
                 if (!writer.writeLong("nearThreadId", nearThreadId))
                     return false;
 
@@ -161,7 +162,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
             return false;
 
         switch (reader.state()) {
-            case 5:
+            case 7:
                 errBytes = reader.readByteArray("errBytes");
 
                 if (!reader.isLastRead())
@@ -169,7 +170,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 6:
+            case 8:
                 miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
@@ -177,7 +178,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 7:
+            case 9:
                 nearThreadId = reader.readLong("nearThreadId");
 
                 if (!reader.isLastRead())
@@ -197,7 +198,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 71d2353..8a8dc7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -152,7 +152,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
      * @param nodeId Sender.
      * @param res Result.
      */
-    public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res);
+    public abstract void onPrimaryResponse(UUID nodeId, GridNearTxPrepareResponse res);
 
     /**
      * @param nodeId Sender.
@@ -197,7 +197,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
      * @param res Response.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    final void onPrimaryPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+    final void processPrimaryPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
         if (res == null)
             return;
 


[6/8] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-4768' into ignite-4768-1

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-4768' into ignite-4768-1

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7cf4b986
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7cf4b986
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7cf4b986

Branch: refs/heads/ignite-4768-1
Commit: 7cf4b9860a6b64d3cb63b347e166988b568ec5b6
Parents: e1961ff
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 13:09:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 13:09:44 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoMessageFactory.java         | 1 -
 .../processors/cache/distributed/dht/GridDhtLockFuture.java  | 6 ++++++
 .../cache/distributed/dht/GridDhtTxPrepareFuture.java        | 1 +
 .../cache/distributed/dht/GridDhtTxPrepareResponse.java      | 2 +-
 .../dht/colocated/GridDhtColocatedLockFuture.java            | 6 ++++++
 .../cache/distributed/near/GridNearLockFuture.java           | 8 +++++++-
 .../near/GridNearOptimisticSerializableTxPrepareFuture.java  | 1 +
 .../distributed/near/GridNearOptimisticTxPrepareFuture.java  | 1 +
 .../internal/processors/cache/local/GridLocalLockFuture.java | 6 ++++++
 .../processors/cache/transactions/IgniteTxHandler.java       | 4 +++-
 10 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index ce2c72c..6f95400 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRe
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index a0270b0..cda4641 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -660,6 +661,11 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteInternalTx tx() {
+        return tx;
+    }
+
     /**
      * @return {@code True} if locks have been acquired.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a787f5f..7645d58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1418,6 +1418,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                             nearMapping.writes(),
                             tx.transactionNodes(),
                             tx.nearXidVersion(),
+                            false,
                             true,
                             tx.onePhaseCommit(),
                             tx.subjectId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index cb10374..e7153da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -134,7 +134,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     /**
      * @param val {@code True} if this is reply for near node.
      */
-    void nearNodeResponse(boolean val) {
+    public void nearNodeResponse(boolean val) {
         setFlag(val, NEAR_RES_FLAG_MASK);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 0ce380d..3a84535 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -235,6 +236,11 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteInternalTx tx() {
+        return tx;
+    }
+
     /**
      * @return Future ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index ffc84d8..6b07857 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -236,10 +237,15 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         return lockVer;
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteInternalTx tx() {
+        return tx;
+    }
+
     /**
      * @return Entries.
      */
-    public synchronized List<GridDistributedCacheEntry> entriesCopy() {
+    synchronized List<GridDistributedCacheEntry> entriesCopy() {
         return new ArrayList<>(entries);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index aef40ce..652eec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -449,6 +449,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             m.writes(),
             m.near(),
             txMapping.transactionNodes(),
+            false,
             m.last(),
             tx.onePhaseCommit(),
             tx.needReturnValue() && tx.implicit(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 0e840fe..5cfb1a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -505,6 +505,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     m.writes(),
                     m.near(),
                     txMapping.transactionNodes(),
+                    false,
                     m.last(),
                     tx.onePhaseCommit(),
                     tx.needReturnValue() && tx.implicit(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 8e224c8..e954821 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -378,6 +379,11 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalTx tx() {
+        return tx;
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"ThrowableInstanceNeverThrown"})
     @Override public boolean cancel() {
         if (onCancelled()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf4b986/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 33c8a18..aaba610 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -969,8 +969,10 @@ public class IgniteTxHandler {
                     req.partition(),
                     req.nearXidVersion(),
                     req.nearFutureId(),
-                    null, //req.nearMiniId(),
+                    req.nearMiniId(),
                     req.deployInfo() != null);
+
+                nearRes.nearNodeResponse(true);
             }
             else {
                 res = new GridDhtTxPrepareResponse(


[2/8] ignite git commit: ignite-4768

Posted by sb...@apache.org.
ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3d4a36b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3d4a36b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3d4a36b

Branch: refs/heads/ignite-4768-1
Commit: d3d4a36b4f0c71b5635dadd9211f73e728e29483
Parents: 784b171
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 9 16:38:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 9 16:38:54 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java | 85 +++++++++++---------
 .../dht/GridDhtTxPrepareRequest.java            | 12 +--
 .../dht/GridDhtTxPrepareResponse.java           | 20 ++---
 3 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d093b4a..735653d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -562,7 +562,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @return Mini future.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private MiniFuture miniFuture(IgniteUuid miniId) {
+    private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
         synchronized (sync) {
             int size = futuresCountNoLock();
@@ -576,7 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                 MiniFuture mini = (MiniFuture)fut;
 
-                if (mini.futureId().equals(miniId)) {
+                if (mini.futureId() == miniId) {
                     if (!mini.isDone())
                         return mini;
                     else
@@ -1233,6 +1233,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 return;
 
             if (last) {
+                int miniId = 0;
+
                 assert tx.transactionNodes() != null;
 
                 final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
@@ -1257,7 +1259,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     if (tx.remainingTime() == -1)
                         return;
 
-                    MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
+                    MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
 
                     add(fut); // Append new future.
 
@@ -1371,7 +1373,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         if (tx.remainingTime() == -1)
                             return;
 
-                        MiniFuture fut = new MiniFuture(nearMapping.primary().id(), null, nearMapping);
+                        MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
 
                         add(fut); // Append new future.
 
@@ -1481,25 +1483,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             try {
                 List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
 
+                assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
+
                 if (log.isDebugEnabled())
                     log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
                         ", entry=" + entry + ']');
 
-                // Exclude local node.
-                map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap);
+                for (int i = 1; i < dhtNodes.size(); i++) {
+                    ClusterNode node = dhtNodes.get(i);
+
+                    addMapping(entry, node, dhtMap);
+                }
 
                 Collection<UUID> readers = cached.readers();
 
                 if (!F.isEmpty(readers)) {
-                    Collection<ClusterNode> nearNodes =
-                        cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId())));
+                    for (UUID readerId : readers) {
+                        if (readerId.equals(tx.nearNodeId()))
+                            continue;
 
-                    if (log.isDebugEnabled())
-                        log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) +
-                            ", entry=" + entry + ']');
+                        ClusterNode readerNode = cctx.discovery().node(readerId);
+
+                        if (readerNode == null || dhtNodes.contains(readerNode))
+                            continue;
+
+                        if (log.isDebugEnabled())
+                            log.debug("Mapping entry to near node [node=" + readerNode + ", entry=" + entry + ']');
 
-                    // Exclude DHT nodes.
-                    map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap);
+                        addMapping(entry, readerNode, nearMap);
+                    }
                 }
                 else if (log.isDebugEnabled())
                     log.debug("Entry has no near readers: " + entry);
@@ -1516,39 +1528,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
     /**
      * @param entry Entry.
-     * @param nodes Nodes.
+     * @param n Node.
      * @param globalMap Map.
      */
-    private void map(
+    private void addMapping(
         IgniteTxEntry entry,
-        Iterable<ClusterNode> nodes,
+        ClusterNode n,
         Map<UUID, GridDistributedTxMapping> globalMap
     ) {
-        if (nodes != null) {
-            for (ClusterNode n : nodes) {
-                GridDistributedTxMapping global = globalMap.get(n.id());
-
-                if (!F.isEmpty(entry.entryProcessors())) {
-                    GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
-                        entry.cached().partition());
+        GridDistributedTxMapping global = globalMap.get(n.id());
 
-                    if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
-                        T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
+        if (!F.isEmpty(entry.entryProcessors())) {
+            GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+                entry.cached().partition());
 
-                        assert procVal != null : entry;
+            if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+                T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
 
-                        entry.op(procVal.get1());
-                        entry.value(procVal.get2(), true, false);
-                        entry.entryProcessors(null);
-                    }
-                }
+                assert procVal != null : entry;
 
-                if (global == null)
-                    globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
-
-                global.add(entry);
+                entry.op(procVal.get1());
+                entry.value(procVal.get2(), true, false);
+                entry.entryProcessors(null);
             }
         }
+
+        if (global == null)
+            globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
+
+        global.add(entry);
     }
 
     /**
@@ -1602,7 +1610,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         private static final long serialVersionUID = 0L;
 
         /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
+        private final int futId;
 
         /** Node ID. */
         private UUID nodeId;
@@ -1617,17 +1625,20 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
         /**
          * @param nodeId Node ID.
+         * @param futId Future ID.
          * @param dhtMapping Mapping.
          * @param nearMapping nearMapping.
          */
         MiniFuture(
             UUID nodeId,
+            int futId,
             GridDistributedTxMapping dhtMapping,
             GridDistributedTxMapping nearMapping
         ) {
             assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary());
 
             this.nodeId = nodeId;
+            this.futId = futId;
             this.dhtMapping = dhtMapping;
             this.nearMapping = nearMapping;
         }
@@ -1635,7 +1646,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         /**
          * @return Future ID.
          */
-        IgniteUuid futureId() {
+        int futureId() {
             return futId;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 8c01302..8898803 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -59,7 +59,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     private IgniteUuid futId;
 
     /** Mini future ID. */
-    private IgniteUuid miniId;
+    private int miniId;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
@@ -120,7 +120,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
      */
     public GridDhtTxPrepareRequest(
         IgniteUuid futId,
-        IgniteUuid miniId,
+        int miniId,
         AffinityTopologyVersion topVer,
         GridDhtTxLocalAdapter tx,
         long timeout,
@@ -145,7 +145,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
             addDepInfo);
 
         assert futId != null;
-        assert miniId != null;
+        assert miniId != 0;
 
         this.topVer = topVer;
         this.futId = futId;
@@ -245,7 +245,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /**
      * @return Mini future ID.
      */
-    public IgniteUuid miniId() {
+    public int miniId() {
         return miniId;
     }
 
@@ -361,7 +361,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
@@ -453,7 +453,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 22:
-                miniId = reader.readIgniteUuid("miniId");
+                miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index fdead95..416540a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -58,7 +58,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     private IgniteUuid futId;
 
     /** Mini future ID. */
-    private IgniteUuid miniId;
+    private int miniId;
 
     /** Invalid partitions by cache ID. */
     @GridDirectMap(keyType = Integer.class, valueType = int[].class)
@@ -81,11 +81,11 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
      * @param miniId Mini future ID.
      * @param addDepInfo Deployment info flag.
      */
-    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, boolean addDepInfo) {
+    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, boolean addDepInfo) {
         super(xid, addDepInfo);
 
         assert futId != null;
-        assert miniId != null;
+        assert miniId != 0;
 
         this.futId = futId;
         this.miniId = miniId;
@@ -98,12 +98,12 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
      * @param err Error.
      * @param addDepInfo Deployment enabled.
      */
-    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err,
+    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, Throwable err,
         boolean addDepInfo) {
         super(xid, err, addDepInfo);
 
         assert futId != null;
-        assert miniId != null;
+        assert miniId != 0;
 
         this.futId = futId;
         this.miniId = miniId;
@@ -112,7 +112,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     /**
      * @return Evicted readers.
      */
-    public Collection<IgniteTxKey> nearEvicted() {
+    Collection<IgniteTxKey> nearEvicted() {
         return nearEvicted;
     }
 
@@ -133,14 +133,14 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     /**
      * @return Mini future ID.
      */
-    public IgniteUuid miniId() {
+    public int miniId() {
         return miniId;
     }
 
     /**
      * @return Map from cacheId to an array of invalid partitions.
      */
-    public Map<Integer, int[]> invalidPartitionsByCacheId() {
+    Map<Integer, int[]> invalidPartitionsByCacheId() {
         return invalidParts;
     }
 
@@ -250,7 +250,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
@@ -300,7 +300,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 reader.incrementState();
 
             case 10:
-                miniId = reader.readIgniteUuid("miniId");
+                miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
                     return false;


[7/8] ignite git commit: ignite-4768

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index d9b648c..66fe902 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -344,61 +344,61 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         }
 
         switch (writer.state()) {
-            case 8:
+            case 10:
                 if (!writer.writeMessage("clientRemapVer", clientRemapVer))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 11:
                 if (!writer.writeMessage("dhtVer", dhtVer))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 12:
                 if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 11:
+            case 13:
                 if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 12:
+            case 14:
                 if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 13:
+            case 15:
                 if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 14:
+            case 16:
                 if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 15:
+            case 17:
                 if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 16:
+            case 18:
                 if (!writer.writeMessage("retVal", retVal))
                     return false;
 
                 writer.incrementState();
 
-            case 17:
+            case 19:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -420,7 +420,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
             return false;
 
         switch (reader.state()) {
-            case 8:
+            case 10:
                 clientRemapVer = reader.readMessage("clientRemapVer");
 
                 if (!reader.isLastRead())
@@ -428,7 +428,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 reader.incrementState();
 
-            case 9:
+            case 11:
                 dhtVer = reader.readMessage("dhtVer");
 
                 if (!reader.isLastRead())
@@ -436,7 +436,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 reader.incrementState();
 
-            case 10:
+            case 12:
                 filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -444,7 +444,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 reader.incrementState();
 
-            case 11:
+            case 13:
                 futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
@@ -452,7 +452,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 reader.incrementState();
 
-            case 12:
+            case 14:
                 miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
@@ -460,7 +460,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 reader.incrementState();
 
-            case 13:
+            case 15:
                 ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -468,7 +468,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 reader.incrementState();
 
-            case 14:
+            case 16:
                 ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -476,7 +476,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 reader.incrementState();
 
-            case 15:
+            case 17:
                 pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -484,7 +484,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 reader.incrementState();
 
-            case 16:
+            case 18:
                 retVal = reader.readMessage("retVal");
 
                 if (!reader.isLastRead())
@@ -492,7 +492,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 reader.incrementState();
 
-            case 17:
+            case 19:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -512,7 +512,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 18;
+        return 20;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/26316bb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index aaba610..22e6b69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -582,7 +582,7 @@ public class IgniteTxHandler {
 
         res.txState(tx.txState());
 
-        fut.onResult(nodeId, res);
+        fut.onPrimaryResponse(nodeId, res);
     }
 
     /**
@@ -607,7 +607,7 @@ public class IgniteTxHandler {
             return;
         }
 
-        fut.onResult(nodeId, res);
+        fut.onPrimaryResponse(nodeId, res);
     }
 
     /**
@@ -650,7 +650,7 @@ public class IgniteTxHandler {
         assert nodeId != null;
         assert res != null;
 
-        if (res.checkCommitted()) {
+        if (res.nearNodeResponse() || res.checkCommitted()) {
             GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
 
             if (fut == null) {
@@ -669,7 +669,7 @@ public class IgniteTxHandler {
                     ", node=" + nodeId + ']');
             }
 
-            fut.onResult(nodeId, res);
+            fut.onDhtResponse(nodeId, res);
         }
         else {
             GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
@@ -803,8 +803,13 @@ public class IgniteTxHandler {
                 ", commit=" + req.commit() + ']');
 
             // Always send finish response.
-            GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), req.futureId(),
-                req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+            GridCacheMessage res = new GridNearTxFinishResponse(
+                req.partition(),
+                req.version(),
+                req.threadId(),
+                req.futureId(),
+                req.miniId(),
+                new IgniteCheckedException("Transaction has been already completed."));
 
             try {
                 ctx.io().send(nodeId, res, req.policy());
@@ -844,14 +849,9 @@ public class IgniteTxHandler {
         try {
             assert tx != null : "Transaction is null for near finish request [nodeId=" +
                 nodeId + ", req=" + req + "]";
+            assert req.syncMode() != null : req;
 
-            if (req.syncMode() == null) {
-                boolean sync = req.commit() ? req.syncCommit() : req.syncRollback();
-
-                tx.syncMode(sync ? FULL_SYNC : FULL_ASYNC);
-            }
-            else
-                tx.syncMode(req.syncMode());
+            tx.syncMode(req.syncMode());
 
             if (req.commit()) {
                 tx.storeEnabled(req.storeEnabled());
@@ -1121,7 +1121,7 @@ public class IgniteTxHandler {
         if (req.checkCommitted()) {
             boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
 
-            if (!committed || !req.syncCommit())
+            if (!committed || req.syncMode() != FULL_SYNC)
                 sendReply(nodeId, req, committed, null);
             else {
                 IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
@@ -1359,7 +1359,11 @@ public class IgniteTxHandler {
      */
     private void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
         if (req.replyRequired() || req.checkCommitted()) {
-            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
+            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+                req.partition(),
+                req.version(),
+                req.futureId(),
+                req.miniId());
 
             if (req.checkCommitted()) {
                 res.checkCommitted(true);


[5/8] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-4768' into ignite-4768-1

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-4768' into ignite-4768-1

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1961ff6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1961ff6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1961ff6

Branch: refs/heads/ignite-4768-1
Commit: e1961ff620d8f73469101da96eccda9ed2eddc99
Parents: 5596e69 eeb10b8
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 13:09:28 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 13:09:28 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMessage.java      | 11 +++++++++++
 .../processors/cache/KeyCacheObjectImpl.java    |  9 +++++++++
 .../GridDistributedTxPrepareRequest.java        | 16 ++++++++++++++++
 .../distributed/dht/GridDhtTxPrepareFuture.java | 20 +++++++++++++-------
 .../dht/GridDhtTxPrepareRequest.java            | 12 ++++++------
 .../dht/GridDhtTxPrepareResponse.java           | 16 ++++++++--------
 .../distributed/near/GridNearLockFuture.java    |  2 ++
 ...arOptimisticSerializableTxPrepareFuture.java |  8 ++++----
 .../near/GridNearOptimisticTxPrepareFuture.java |  1 -
 .../near/GridNearTxPrepareRequest.java          | 17 ++++++++++++++++-
 10 files changed, 85 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 454c8fb,735653d..a787f5f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -1272,13 -1259,9 +1276,13 @@@ public final class GridDhtTxPrepareFutu
                      if (tx.remainingTime() == -1)
                          return;
  
 -                    MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
 +                    MiniFuture fut = null;
  
 -                    add(fut); // Append new future.
 +                    if (!tx.dhtReplyNear()) {
-                         fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
++                        fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
 +
 +                        add(fut); // Append new future.
 +                    }
  
                      assert txNodes != null;
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index fe2d293,8898803..85a65a8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@@ -59,14 -59,8 +59,14 @@@ public class GridDhtTxPrepareRequest ex
      private IgniteUuid futId;
  
      /** Mini future ID. */
-     private IgniteUuid miniId;
+     private int miniId;
  
 +    /** Future ID. */
 +    private IgniteUuid nearFutId;
 +
 +    /** Mini future ID. */
 +    private int nearMiniId;
 +
      /** Topology version. */
      private AffinityTopologyVersion topVer;
  
@@@ -128,9 -120,7 +128,9 @@@
       */
      public GridDhtTxPrepareRequest(
          IgniteUuid futId,
-         IgniteUuid miniId,
+         int miniId,
 +        IgniteUuid nearFutId,
 +        int nearMiniId,
          AffinityTopologyVersion topVer,
          GridDhtTxLocalAdapter tx,
          long timeout,
@@@ -156,8 -144,8 +156,8 @@@
              onePhaseCommit,
              addDepInfo);
  
-         assert dhtNearReply || (futId != null && miniId != null);
 -        assert futId != null;
 -        assert miniId != 0;
++        assert dhtNearReply || (futId != null && miniId != 0);
 +        assert !dhtNearReply || (nearFutId != null && nearMiniId != 0);
  
          this.topVer = topVer;
          this.futId = futId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index c37ac80,416540a..cb10374
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@@ -85,16 -81,11 +85,16 @@@ public class GridDhtTxPrepareResponse e
       * @param miniId Mini future ID.
       * @param addDepInfo Deployment info flag.
       */
 -    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, boolean addDepInfo) {
 -        super(xid, addDepInfo);
 +    public GridDhtTxPrepareResponse(
 +        int part,
 +        GridCacheVersion xid,
 +        IgniteUuid futId,
-         IgniteUuid miniId,
++        int miniId,
 +        boolean addDepInfo) {
 +        super(part, xid, addDepInfo);
  
          assert futId != null;
-         assert miniId != null;
+         assert miniId != 0;
  
          this.futId = futId;
          this.miniId = miniId;
@@@ -108,17 -98,12 +108,17 @@@
       * @param err Error.
       * @param addDepInfo Deployment enabled.
       */
 -    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, Throwable err,
 +    public GridDhtTxPrepareResponse(
 +        int part,
 +        GridCacheVersion xid,
 +        IgniteUuid futId,
-         IgniteUuid miniId,
++        int miniId,
 +        Throwable err,
          boolean addDepInfo) {
 -        super(xid, err, addDepInfo);
 +        super(part, xid, err, addDepInfo);
  
          assert futId != null;
-         assert miniId != null;
+         assert miniId != 0;
  
          this.futId = futId;
          this.miniId = miniId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1961ff6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------


[3/8] ignite git commit: ignite-4768

Posted by sb...@apache.org.
ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eeb10b85
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eeb10b85
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eeb10b85

Branch: refs/heads/ignite-4768-1
Commit: eeb10b85ba41b14d0beb590a32bff06225c7de57
Parents: d3d4a36
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 11:00:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 11:04:42 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java       |  3 +++
 .../processors/cache/GridCacheMessage.java         | 11 +++++++++++
 .../processors/cache/KeyCacheObjectImpl.java       |  9 +++++++++
 .../GridDistributedTxPrepareRequest.java           | 16 ++++++++++++++++
 ...dNearOptimisticSerializableTxPrepareFuture.java |  7 ++++---
 .../distributed/near/GridNearTxPrepareRequest.java | 17 ++++++++++++++++-
 6 files changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 50f58cc..7cac367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -367,6 +367,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             unmarshall(nodeId, cacheMsg);
 
+//            if (!cacheMsg.partitionExchangeMessage())
+//                log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
+
             if (cacheMsg.classError() != null)
                 processFailedMessage(nodeId, cacheMsg, c);
             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 0646d5a..023d12c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -701,6 +701,17 @@ public abstract class GridCacheMessage implements Message {
         return reader.afterMessageRead(GridCacheMessage.class);
     }
 
+    /**
+     * @param str Bulder.
+     * @param name Flag name.
+     */
+    protected final void appendFlag(StringBuilder str, String name) {
+        if (str.length() > 0)
+            str.append('|');
+
+        str.append(name);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheMessage.class, this, "cacheId", cacheId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index 4f8570c..48797b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
@@ -200,4 +201,12 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
 
         return val.equals(other.val);
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(S.INCLUDE_SENSITIVE ? getClass().getSimpleName() : "KeyCacheObject",
+            "part", part, true,
+            "val", val, true,
+            "hasValBytes", valBytes != null, false);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 329dc8b..b5848a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAwa
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -150,6 +151,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
     private IgniteTxState txState;
 
     /** */
+    @GridToStringExclude
     private byte flags;
 
     /**
@@ -682,7 +684,21 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
     /** {@inheritDoc} */
     @Override public String toString() {
+        StringBuilder flags = new StringBuilder();
+
+        if (needReturnValue())
+            flags.append("retVal");
+        if (isInvalidate())
+            flags.append("invalidate");
+        if (onePhaseCommit())
+            flags.append("onePhase");
+        if (last())
+            flags.append("last");
+        if (system())
+            flags.append("sys");
+
         return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
+            "flags", flags.toString(),
             "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index f8e0584..80508dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -75,6 +75,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     @GridToStringExclude
     private ClientRemapFuture remapFut;
 
+    /** */
+    private int miniId;
+
     /**
      * @param cctx Context.
      * @param tx Transaction.
@@ -222,7 +225,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      * @return Mini future.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private MiniFuture miniFuture(long miniId) {
+    private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
         synchronized (sync) {
             int size = futuresCountNoLock();
@@ -362,8 +365,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         checkOnePhase(txMapping);
 
-        int miniId = 0;
-
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb10b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index cccc7b4..ffeeb51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -70,6 +71,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     private int taskNameHash;
 
     /** */
+    @GridToStringExclude
     private byte flags;
 
     /**
@@ -402,6 +404,19 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridNearTxPrepareRequest.class, this, super.toString());
+        StringBuilder flags = new StringBuilder();
+
+        if (near())
+            flags.append("near");
+        if (firstClientRequest())
+            flags.append("clientReq");
+        if (implicitSingle())
+            flags.append("single");
+        if (explicitLock())
+            flags.append("explicitLock");
+
+        return S.toString(GridNearTxPrepareRequest.class, this,
+            "flags", flags.toString(),
+            "super", super.toString());
     }
 }


[4/8] ignite git commit: ignite-4768

Posted by sb...@apache.org.
ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5596e69f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5596e69f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5596e69f

Branch: refs/heads/ignite-4768-1
Commit: 5596e69f021a8e3da21479a11ee6079314e33144
Parents: 4b44091
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 12:58:34 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 12:58:34 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   5 -
 .../processors/cache/GridCacheIoManager.java    |   2 +
 .../processors/cache/GridCacheMvccFuture.java   |   3 +
 .../GridDistributedTxPrepareResponse.java       |  44 +++-
 .../dht/GridDhtTxNearPrepareResponse.java       | 232 -------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java |   1 +
 .../dht/GridDhtTxPrepareResponse.java           |  43 +++-
 ...arOptimisticSerializableTxPrepareFuture.java |   4 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   4 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   9 +-
 .../near/GridNearTxFinishFuture.java            |  40 ++--
 .../near/GridNearTxPrepareFutureAdapter.java    |   9 +-
 .../near/GridNearTxPrepareResponse.java         |   4 +-
 .../cache/transactions/IgniteTxHandler.java     |  62 ++---
 modules/yardstick/config/ignite-base-config.xml | 171 +-------------
 .../yardstick/cache/IgnitePutTxBenchmark.java   |   5 +-
 16 files changed, 150 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index df3d7e8..ce2c72c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -174,11 +174,6 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
-            case -50:
-                msg = new GridDhtTxNearPrepareResponse();
-
-                break;
-
             case -44:
                 msg = new TcpCommunicationSpi.HandshakeMessage2();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 7cac367..71f4e1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -547,6 +547,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
 
                 GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(
+                    req.partition(),
                     req.version(),
                     req.futureId(),
                     req.miniId(),
@@ -672,6 +673,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
 
                 GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+                    req.partition(),
                     req.version(),
                     req.futureId(),
                     req.miniId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
index 080a6f1..a0acf18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
@@ -34,4 +35,6 @@ public interface GridCacheMvccFuture<T> extends GridCacheFuture<T> {
      * @return {@code True} if future cares about this entry.
      */
     public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner);
+
+    public IgniteInternalTx tx();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 99f36c2..c19b8c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -51,6 +51,12 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
     @GridDirectTransient
     private IgniteTxState txState;
 
+    /** */
+    private int part;
+
+    /** */
+    private byte flags;
+
     /**
      * Empty constructor (required by {@link Externalizable}).
      */
@@ -59,24 +65,54 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
     }
 
     /**
-     * @param xid Transaction ID.
+     * @param part Partition.
+     * @param xid Lock or transaction ID.
      * @param addDepInfo Deployment info flag.
      */
-    public GridDistributedTxPrepareResponse(GridCacheVersion xid, boolean addDepInfo) {
+    public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, boolean addDepInfo) {
         super(xid, 0, addDepInfo);
+
+        this.part = part;
     }
 
     /**
-     * @param xid Lock ID.
+     * @param part Partition.
+     * @param xid Lock or transaction ID.
      * @param err Error.
      * @param addDepInfo Deployment info flag.
      */
-    public GridDistributedTxPrepareResponse(GridCacheVersion xid, Throwable err, boolean addDepInfo) {
+    public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, Throwable err, boolean addDepInfo) {
         super(xid, 0, addDepInfo);
 
+        this.part = part;
         this.err = err;
     }
 
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    protected final void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    protected final boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return part;
+    }
+
     /** {@inheritDoc} */
     @Override public Throwable error() {
         return err;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java
deleted file mode 100644
index e582bd2..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxNearPrepareResponse.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class GridDhtTxNearPrepareResponse extends GridCacheMessage implements IgniteTxStateAware {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private int partId;
-
-    /** */
-    private GridCacheVersion nearTxId;
-
-    /** Future ID.  */
-    private IgniteUuid futId;
-
-    /** Mini future ID. */
-    private int miniId;
-
-    /** Transient TX state. */
-    @GridDirectTransient
-    private IgniteTxState txState;
-
-    /**
-     *
-     */
-    public GridDhtTxNearPrepareResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param partId Partition ID.
-     * @param nearTxId Near transaction ID.
-     * @param futId Future ID.
-     * @param miniId Mini future ID.
-     */
-    public GridDhtTxNearPrepareResponse(int partId, GridCacheVersion nearTxId, IgniteUuid futId, int miniId) {
-        assert nearTxId != null;
-        assert futId != null;
-        assert miniId > 0;
-
-        this.partId = partId;
-        this.nearTxId = nearTxId;
-        this.futId = futId;
-        this.miniId = miniId;
-    }
-
-    /**
-     * @return Near transaction ID.
-     */
-    public GridCacheVersion nearTxId() {
-        return nearTxId;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Mini future ID.
-     */
-    public int miniId() {
-        return miniId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partition() {
-        return partId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteTxState txState() {
-        return txState;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void txState(IgniteTxState txState) {
-        this.txState = txState;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
-        return ctx.txPrepareMessageLogger();
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return -50;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 7;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 3:
-                if (!writer.writeIgniteUuid("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeInt("miniId", miniId))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeMessage("nearTxId", nearTxId))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeInt("partId", partId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 3:
-                futId = reader.readIgniteUuid("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                miniId = reader.readInt("miniId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                nearTxId = reader.readMessage("nearTxId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                partId = reader.readInt("partId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(GridDhtTxNearPrepareResponse.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxNearPrepareResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 7f0cddd..454c8fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -858,6 +858,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         assert F.isEmpty(tx.invalidPartitions());
 
         GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+            -1,
             tx.nearXidVersion(),
             tx.colocated() ? tx.xid() : tx.nearFutureId(),
             nearMiniId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index fdead95..c37ac80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -49,6 +49,9 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final int NEAR_RES_FLAG_MASK = 0x01;
+
     /** Evicted readers. */
     @GridToStringInclude
     @GridDirectCollection(IgniteTxKey.class)
@@ -76,13 +79,19 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     }
 
     /**
+     * @param part Partition.
      * @param xid Xid version.
      * @param futId Future ID.
      * @param miniId Mini future ID.
      * @param addDepInfo Deployment info flag.
      */
-    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, boolean addDepInfo) {
-        super(xid, addDepInfo);
+    public GridDhtTxPrepareResponse(
+        int part,
+        GridCacheVersion xid,
+        IgniteUuid futId,
+        IgniteUuid miniId,
+        boolean addDepInfo) {
+        super(part, xid, addDepInfo);
 
         assert futId != null;
         assert miniId != null;
@@ -92,15 +101,21 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     }
 
     /**
+     * @param part Partition.
      * @param xid Xid version.
      * @param futId Future ID.
      * @param miniId Mini future ID.
      * @param err Error.
      * @param addDepInfo Deployment enabled.
      */
-    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err,
+    public GridDhtTxPrepareResponse(
+        int part,
+        GridCacheVersion xid,
+        IgniteUuid futId,
+        IgniteUuid miniId,
+        Throwable err,
         boolean addDepInfo) {
-        super(xid, err, addDepInfo);
+        super(part, xid, err, addDepInfo);
 
         assert futId != null;
         assert miniId != null;
@@ -110,9 +125,23 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     }
 
     /**
+     * @return {@code True} if this is reply for near node.
+     */
+    public boolean nearNodeResponse() {
+        return isFlag(NEAR_RES_FLAG_MASK);
+    }
+
+    /**
+     * @param val {@code True} if this is reply for near node.
+     */
+    void nearNodeResponse(boolean val) {
+        setFlag(val, NEAR_RES_FLAG_MASK);
+    }
+
+    /**
      * @return Evicted readers.
      */
-    public Collection<IgniteTxKey> nearEvicted() {
+    Collection<IgniteTxKey> nearEvicted() {
         return nearEvicted;
     }
 
@@ -140,7 +169,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     /**
      * @return Map from cacheId to an array of invalid partitions.
      */
-    public Map<Integer, int[]> invalidPartitionsByCacheId() {
+    Map<Integer, int[]> invalidPartitionsByCacheId() {
         return invalidParts;
     }
 
@@ -156,7 +185,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
      *
      * @return Collection of entry infos need to be preloaded.
      */
-    public Collection<GridCacheEntryInfo> preloadEntries() {
+    Collection<GridCacheEntryInfo> preloadEntries() {
         return preloadEntries == null ? Collections.<GridCacheEntryInfo>emptyList() : preloadEntries;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index fcd714b..3d919f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -202,7 +202,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /** {@inheritDoc} */
-    @Override public void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+    @Override public void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
         assert false; // TODO IGNITE-4768.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 9a7f500..5cfb1a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -40,7 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -203,7 +203,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     }
 
     /** {@inheritDoc} */
-    @Override public void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+    @Override public void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
         assert false; // TODO IGNITE-4768.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index ed3f2f1..b476336 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -126,7 +126,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     }
 
     /** {@inheritDoc} */
-    @Override public void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+    @Override public void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
+        assert res.nearNodeResponse() : res;
+
         MiniFuture f = miniFuture(res.miniId());
 
         if (f != null)
@@ -438,6 +440,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             this.m = m;
             this.futId = futId;
 
+            // TODO: IGNITE-4768, check nodes alive.
             if (req.dhtReplyNear()) {
                 dhtNodes = new HashSet<>(req.transactionNodes().get(m.primary().id()));
 
@@ -510,7 +513,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
          * @param nodeId Node ID.
          * @param res Response.
          */
-        void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
+        void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
             assert dhtNodes != null;
 
             boolean done;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c13650e..06cf878 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -256,7 +256,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                     CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
 
                     if (f.futureId().equals(res.miniId()))
-                        f.onDhtFinishResponse(nodeId, false);
+                        f.onDhtFinishResponse(nodeId);
                 }
             }
 
@@ -690,6 +690,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             tx.activeCachesDeploymentEnabled()
         );
 
+        boolean dhtReplyNear = tx.dhtReplyNear() && syncMode == FULL_SYNC;
+
         // If this is the primary node for the keys.
         if (n.isLocal()) {
             req.miniId(IgniteUuid.randomUuid());
@@ -701,7 +703,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 add(fut);
         }
         else {
-            FinishMiniFuture fut = new FinishMiniFuture(m);
+            FinishMiniFuture fut = new FinishMiniFuture(m, dhtReplyNear);
 
             req.miniId(fut.futureId());
 
@@ -862,11 +864,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         @GridToStringInclude
         private GridDistributedTxMapping m;
 
+        /** */
+        private final Set<UUID> dhtNodes;
+
         /**
          * @param m Mapping.
          */
-        FinishMiniFuture(GridDistributedTxMapping m) {
+        FinishMiniFuture(GridDistributedTxMapping m, boolean dhtReplyNear) {
             this.m = m;
+
+            if (dhtReplyNear) {
+                dhtNodes = new HashSet<>(tx.transactionNodes().get(m.primary().id()));
+
+                assert !dhtNodes.isEmpty();
+            }
+            else
+                dhtNodes = null;
         }
 
         /**
@@ -876,13 +889,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             return m.primary();
         }
 
-        /**
-         * @return Keys.
-         */
-        public GridDistributedTxMapping mapping() {
-            return m;
-        }
-
         /** {@inheritDoc} */
         boolean onNodeLeft(UUID nodeId, boolean discoThread) {
             if (nodeId.equals(m.primary().id())) {
@@ -913,7 +919,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                                         fut.listen(new CI1<IgniteInternalFuture<?>>() {
                                             @Override public void apply(IgniteInternalFuture<?> fut) {
-                                                mini.onDhtFinishResponse(cctx.localNodeId(), true);
+                                                mini.onDhtFinishResponse(cctx.localNodeId());
                                             }
                                         });
                                     }
@@ -930,7 +936,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                                     }
                                 }
                                 else
-                                    mini.onDhtFinishResponse(backupId, true);
+                                    mini.onDhtFinishResponse(backupId);
                             }
                         }
                     }
@@ -948,6 +954,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
          * @param res Result callback.
          */
         void onNearFinishResponse(GridNearTxFinishResponse res) {
+            assert dhtNodes == null || res.error() != null;
+
             if (res.error() != null)
                 onDone(res.error());
             else
@@ -956,7 +964,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(FinishMiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+            return S.toString(FinishMiniFuture.class, this,
+                "done", isDone(),
+                "cancelled", isCancelled(),
+                "err", error());
         }
     }
 
@@ -1055,9 +1066,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
         /**
          * @param nodeId Node ID.
-         * @param discoThread {@code True} if executed from discovery thread.
          */
-        void onDhtFinishResponse(UUID nodeId, boolean discoThread) {
+        void onDhtFinishResponse(UUID nodeId) {
             onResponse(nodeId);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 7f94e9f..71d2353 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteReducer;
@@ -159,7 +158,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
      * @param nodeId Sender.
      * @param res Response.
      */
-    public abstract void onDhtResponse(UUID nodeId, GridDhtTxNearPrepareResponse res);
+    public abstract void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res);
 
     /**
      * Checks if mapped transaction can be committed on one phase.
@@ -189,8 +188,8 @@ public abstract class GridNearTxPrepareFutureAdapter extends
      * @param m Mapping.
      * @param res Response.
      */
-    final void onDhtPrepareResponse(GridDistributedTxMapping m, GridDhtTxNearPrepareResponse res) {
-
+    final void processDhtPrepareResponse(GridDistributedTxMapping m, GridDhtTxPrepareResponse res) {
+        // TODO IGNITE-4768.
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 08b071d..d9b648c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -102,6 +102,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     }
 
     /**
+     * @param part Partition.
      * @param xid Xid version.
      * @param futId Future ID.
      * @param miniId Mini future ID.
@@ -113,6 +114,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
      * @param addDepInfo Deployment info flag.
      */
     public GridNearTxPrepareResponse(
+        int part,
         GridCacheVersion xid,
         IgniteUuid futId,
         int miniId,
@@ -123,7 +125,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         AffinityTopologyVersion clientRemapVer,
         boolean addDepInfo
     ) {
-        super(xid, err, addDepInfo);
+        super(part, xid, err, addDepInfo);
 
         assert futId != null;
         assert dhtVer != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 07777e2..33c8a18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxNearPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
@@ -152,12 +152,6 @@ public class IgniteTxHandler {
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxNearPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processDhtTxNearPrepareResponse(nodeId, (GridDhtTxNearPrepareResponse)msg);
-            }
-        });
-
         ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg);
@@ -278,6 +272,7 @@ public class IgniteTxHandler {
                         U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
 
                     return new GridNearTxPrepareResponse(
+                        req.partition(),
                         req.version(),
                         req.futureId(),
                         req.miniId(),
@@ -389,6 +384,7 @@ public class IgniteTxHandler {
                     }
 
                     GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+                        req.partition(),
                         req.version(),
                         req.futureId(),
                         req.miniId(),
@@ -565,34 +561,6 @@ public class IgniteTxHandler {
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processDhtTxNearPrepareResponse(UUID nodeId, GridDhtTxNearPrepareResponse res) {
-        if (txPrepareMsgLog.isDebugEnabled())
-            txPrepareMsgLog.debug("Received dht near prepare response [txId=" + res.nearTxId() + ", node=" + nodeId + ']');
-
-        GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc()
-            .<IgniteInternalTx>mvccFuture(res.nearTxId(), res.futureId());
-
-        if (fut == null) {
-            U.warn(log, "Failed to find future for dht near prepare response [txId=" + res.nearTxId() +
-                ", node=" + nodeId +
-                ", res=" + res + ']');
-
-            return;
-        }
-
-        IgniteInternalTx tx = fut.tx();
-
-        assert tx != null;
-
-        res.txState(tx.txState());
-
-        fut.onDhtResponse(nodeId, res);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param res Response.
-     */
     private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) {
         if (txPrepareMsgLog.isDebugEnabled())
             txPrepareMsgLog.debug("Received near prepare response [txId=" + res.version() + ", node=" + nodeId + ']');
@@ -647,7 +615,7 @@ public class IgniteTxHandler {
      * @param res Response.
      */
     private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
-        GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().mvccFuture(res.version(), res.futureId());
+        GridCacheMvccFuture<?> fut = ctx.mvcc().mvccFuture(res.version(), res.futureId());
 
         if (fut == null) {
             if (txPrepareMsgLog.isDebugEnabled()) {
@@ -668,7 +636,10 @@ public class IgniteTxHandler {
 
         res.txState(tx.txState());
 
-        fut.onResult(nodeId, res);
+        if (res.nearNodeResponse())
+            ((GridNearTxPrepareFutureAdapter)fut).onDhtResponse(nodeId, res);
+        else
+            ((GridDhtTxPrepareFuture)fut).onResult(nodeId, res);
     }
 
     /**
@@ -990,18 +961,21 @@ public class IgniteTxHandler {
         GridNearTxRemote nearTx = null;
 
         GridDhtTxPrepareResponse res = null;
-        GridDhtTxNearPrepareResponse nearRes = null;
+        GridDhtTxPrepareResponse nearRes = null;
 
         try {
             if (req.dhtReplyNear()) {
-                nearRes = new GridDhtTxNearPrepareResponse(
+                nearRes = new GridDhtTxPrepareResponse(
                     req.partition(),
                     req.nearXidVersion(),
                     req.nearFutureId(),
-                    req.nearMiniId());
+                    null, //req.nearMiniId(),
+                    req.deployInfo() != null);
             }
             else {
-                res = new GridDhtTxPrepareResponse(req.version(),
+                res = new GridDhtTxPrepareResponse(
+                    req.partition(),
+                    req.version(),
                     req.futureId(),
                     req.miniId(),
                     req.deployInfo() != null);
@@ -1061,7 +1035,11 @@ public class IgniteTxHandler {
             if (nearTx != null)
                 nearTx.rollback();
 
-            res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), e,
+            res = new GridDhtTxPrepareResponse(req.partition(),
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                e,
                 req.deployInfo() != null);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/yardstick/config/ignite-base-config.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-base-config.xml b/modules/yardstick/config/ignite-base-config.xml
index 615cb42..5ee42a7 100644
--- a/modules/yardstick/config/ignite-base-config.xml
+++ b/modules/yardstick/config/ignite-base-config.xml
@@ -32,41 +32,6 @@
         <property name="cacheConfiguration">
             <list>
                 <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="atomic"/>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="ATOMIC"/>
-
-                    <property name="swapEnabled" value="false"/>
-                </bean>
-
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="atomic-offheap"/>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="ATOMIC"/>
-
-                    <property name="swapEnabled" value="false"/>
-
-                    <property name="memoryMode" value="OFFHEAP_TIERED"/>
-                </bean>
-
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="atomic-offheap-values"/>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="ATOMIC"/>
-
-                    <property name="swapEnabled" value="false"/>
-
-                    <property name="memoryMode" value="OFFHEAP_VALUES"/>
-
-                </bean>
-
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                     <property name="name" value="tx"/>
 
                     <property name="cacheMode" value="PARTITIONED"/>
@@ -75,141 +40,9 @@
 
                     <property name="swapEnabled" value="false"/>
 
-                </bean>
-
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="tx-offheap"/>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="TRANSACTIONAL"/>
-
-                    <property name="swapEnabled" value="false"/>
-
-                    <property name="memoryMode" value="OFFHEAP_TIERED"/>
-
-                </bean>
+                    <property name="writeSynchronizationMode" value="FULL_SYNC"/>
 
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="tx-offheap-values"/>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="TRANSACTIONAL"/>
-
-                    <property name="swapEnabled" value="false"/>
-
-                    <property name="memoryMode" value="OFFHEAP_VALUES"/>
-
-                </bean>
-
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="atomic-index"/>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="ATOMIC"/>
-
-                    <property name="swapEnabled" value="false"/>
-
-                    <property name="indexedTypes">
-                        <list>
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Person1</value>
-
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Person2</value>
-
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Person8</value>
-                        </list>
-                    </property>
-                </bean>
-
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="atomic-index-with-eviction"/>
-
-                    <property name="evictionPolicy">
-                        <bean class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy">
-                            <!-- default range (1000000) x (1 + default backups number (1)) / default nodes number (4) -->
-                            <constructor-arg value="500000"/>
-                        </bean>
-                    </property>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="ATOMIC"/>
-
-                    <property name="swapEnabled" value="false"/>
-
-                    <property name="indexedTypes">
-                        <list>
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Person1</value>
-
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Person2</value>
-
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Person8</value>
-                        </list>
-                    </property>
-                </bean>
-
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="query"/>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="ATOMIC"/>
-
-                    <property name="indexedTypes">
-                        <list>
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Organization</value>
-
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Person</value>
-
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.SampleValue</value>
-
-                            <value>java.lang.Integer</value>
-                            <value>java.lang.Integer</value>
-                        </list>
-                    </property>
-                </bean>
-
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="query-offheap"/>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="ATOMIC"/>
-
-                    <property name="swapEnabled" value="false"/>
-
-                    <property name="memoryMode" value="OFFHEAP_TIERED"/>
-
-                    <property name="indexedTypes">
-                        <list>
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Organization</value>
-
-                            <value>java.lang.Integer</value>
-                            <value>org.apache.ignite.yardstick.cache.model.Person</value>
-                        </list>
-                    </property>
-                </bean>
-
-                <bean class="org.apache.ignite.configuration.CacheConfiguration">
-                    <property name="name" value="compute"/>
-
-                    <property name="cacheMode" value="PARTITIONED"/>
-
-                    <property name="atomicityMode" value="TRANSACTIONAL"/>
-
-                    <property name="swapEnabled" value="false"/>
+                    <property name="backups" value="2"/>
                 </bean>
             </list>
         </property>

http://git-wip-us.apache.org/repos/asf/ignite/blob/5596e69f/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
index 15b7cd6..3ad5a9d 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutTxBenchmark.java
@@ -22,9 +22,12 @@ import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
 import org.apache.ignite.yardstick.cache.model.SampleValue;
 import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
 
 /**
  * Ignite benchmark that performs transactional put operations.
@@ -58,7 +61,7 @@ public class IgnitePutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer,
 
     /** {@inheritDoc} */
     @Override public boolean test(Map<Object, Object> ctx) throws Exception {
-        IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+        IgniteBenchmarkUtils.doInTransaction(txs, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, clo);
 
         return true;
     }