You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/20 08:40:13 UTC

[66/71] [abbrv] ignite git commit: ignite-1561 Fixed tx prepare for cross cache tx with near + colocated cache

ignite-1561 Fixed tx prepare for cross cache tx with near + colocated cache


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a5088265
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a5088265
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a5088265

Branch: refs/heads/ignite-4535
Commit: a5088265d7927aab702425249b3f0d6996cb989e
Parents: badf49c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Apr 20 08:29:42 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Apr 20 08:29:42 2017 +0300

----------------------------------------------------------------------
 .../cache/GridCacheSharedContext.java           |   6 +-
 .../processors/cache/GridCacheUtils.java        |  83 +++++--
 .../distributed/GridDistributedTxMapping.java   |  68 +++++-
 .../GridDistributedTxRemoteAdapter.java         |   2 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   2 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   2 +
 ...arOptimisticSerializableTxPrepareFuture.java | 218 ++++++++++++-------
 .../near/GridNearOptimisticTxPrepareFuture.java |  95 ++++++--
 .../GridNearPessimisticTxPrepareFuture.java     | 186 ++++++++++------
 .../near/GridNearTxFinishFuture.java            |   6 +-
 .../cache/distributed/near/GridNearTxLocal.java |  30 +--
 .../near/GridNearTxPrepareFutureAdapter.java    |  30 +--
 .../cache/transactions/IgniteTxHandler.java     |  49 +----
 .../IgniteTxImplicitSingleStateImpl.java        |   6 +
 .../transactions/IgniteTxLocalAdapter.java      |   7 +-
 .../cache/transactions/IgniteTxLocalState.java  |  10 +
 .../cache/transactions/IgniteTxManager.java     |   7 +-
 .../cache/transactions/IgniteTxState.java       |   2 +-
 .../cache/transactions/IgniteTxStateImpl.java   |  52 +++--
 .../lang/gridfunc/PredicateCollectionView.java  |   7 +-
 .../util/lang/gridfunc/PredicateMapView.java    |   6 -
 .../util/lang/gridfunc/PredicateSetView.java    |   6 -
 .../lang/gridfunc/ReadOnlyCollectionView.java   |   6 -
 .../lang/gridfunc/ReadOnlyCollectionView2X.java |   6 -
 .../lang/gridfunc/TransformCollectionView.java  |   7 +-
 .../util/lang/gridfunc/TransformMapView.java    |   6 -
 ...sCacheTxNearEnabledRandomOperationsTest.java |  28 +++
 .../cache/CrossCacheTxRandomOperationsTest.java |  23 +-
 .../dht/GridNearCacheTxNodeFailureSelfTest.java |  31 ---
 .../dht/IgniteCrossCacheTxSelfTest.java         |   8 +
 .../IgniteCacheFailoverTestSuite.java           |   2 -
 .../testsuites/IgniteCacheTestSuite2.java       |   2 +
 32 files changed, 631 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 34bb321..79083e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -52,7 +52,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TransactionMetri
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -742,7 +742,7 @@ public class GridCacheSharedContext<K, V> {
      * @param cacheCtx Cache context.
      * @return Error message if transactions are incompatible.
      */
-    @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridLongList activeCacheIds,
+    @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridIntList activeCacheIds,
         GridCacheContext<K, V> cacheCtx) {
         if (cacheCtx.systemTx() && !tx.system())
             return "system cache can be enlisted only in system transaction";
@@ -751,7 +751,7 @@ public class GridCacheSharedContext<K, V> {
             return "non-system cache can't be enlisted in system transaction";
 
         for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = (int)activeCacheIds.get(i);
+            int cacheId = activeCacheIds.get(i);
 
             GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d20a782..51a95a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -211,14 +211,6 @@ public class GridCacheUtils {
     /** Default transaction config. */
     private static final TransactionConfiguration DEFAULT_TX_CFG = new TransactionConfiguration();
 
-    /** Partition to state transformer. */
-    private static final IgniteClosure PART2STATE =
-        new C1<GridDhtLocalPartition, GridDhtPartitionState>() {
-            @Override public GridDhtPartitionState apply(GridDhtLocalPartition p) {
-                return p.state();
-            }
-        };
-
     /** Empty predicate array. */
     private static final IgnitePredicate[] EMPTY_FILTER = new IgnitePredicate[0];
 
@@ -247,24 +239,79 @@ public class GridCacheUtils {
     private static final CacheEntryPredicate[] ALWAYS_FALSE0_ARR = new CacheEntryPredicate[] {ALWAYS_FALSE0};
 
     /** Read filter. */
-    private static final IgnitePredicate READ_FILTER = new P1<Object>() {
-        @Override public boolean apply(Object e) {
-            return ((IgniteTxEntry)e).op() == READ;
+    public static final IgnitePredicate READ_FILTER = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() == READ;
+        }
+
+        @Override public String toString() {
+            return "READ_FILTER";
+        }
+    };
+
+    /** Read filter. */
+    public static final IgnitePredicate READ_FILTER_NEAR = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() == READ && e.context().isNear();
+        }
+
+        @Override public String toString() {
+            return "READ_FILTER_NEAR";
+        }
+    };
+
+    /** Read filter. */
+    public static final IgnitePredicate READ_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() == READ && !e.context().isNear();
+        }
+
+        @Override public String toString() {
+            return "READ_FILTER_COLOCATED";
+        }
+    };
+
+    /** Write filter. */
+    public static final IgnitePredicate WRITE_FILTER = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() != READ;
+        }
+
+        @Override public String toString() {
+            return "WRITE_FILTER";
+        }
+    };
+
+    /** Write filter. */
+    public static final IgnitePredicate WRITE_FILTER_NEAR = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() != READ && e.context().isNear();
+        }
+
+        @Override public String toString() {
+            return "WRITE_FILTER_NEAR";
+        }
+    };
+
+    /** Write filter. */
+    public static final IgnitePredicate WRITE_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() != READ && !e.context().isNear();
         }
 
         @Override public String toString() {
-            return "Cache transaction read filter";
+            return "WRITE_FILTER_COLOCATED";
         }
     };
 
     /** Write filter. */
-    private static final IgnitePredicate WRITE_FILTER = new P1<Object>() {
-        @Override public boolean apply(Object e) {
-            return ((IgniteTxEntry)e).op() != READ;
+    public static final IgnitePredicate FILTER_NEAR_CACHE_ENTRY = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.context().isNear();
         }
 
         @Override public String toString() {
-            return "Cache transaction write filter";
+            return "FILTER_NEAR_CACHE_ENTRY";
         }
     };
 
@@ -613,7 +660,7 @@ public class GridCacheUtils {
      * @return Filter for transaction reads.
      */
     @SuppressWarnings({"unchecked"})
-    public static <K, V> IgnitePredicate<IgniteTxEntry> reads() {
+    public static IgnitePredicate<IgniteTxEntry> reads() {
         return READ_FILTER;
     }
 
@@ -621,7 +668,7 @@ public class GridCacheUtils {
      * @return Filter for transaction writes.
      */
     @SuppressWarnings({"unchecked"})
-    public static <K, V> IgnitePredicate<IgniteTxEntry> writes() {
+    public static IgnitePredicate<IgniteTxEntry> writes() {
         return WRITE_FILTER;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index f8cec50..45903aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -52,8 +52,8 @@ public class GridDistributedTxMapping {
     /** {@code True} if this is last mapping for node. */
     private boolean last;
 
-    /** {@code True} if mapping is for near caches, {@code false} otherwise. */
-    private boolean near;
+    /** Near cache entries count. */
+    private int nearEntries;
 
     /** {@code True} if this is first mapping for optimistic tx on client node. */
     private boolean clientFirst;
@@ -96,17 +96,17 @@ public class GridDistributedTxMapping {
     }
 
     /**
-     * @return {@code True} if mapping is for near caches, {@code false} otherwise.
+     * @return {@code True} if has colocated cache entries.
      */
-    public boolean near() {
-        return near;
+    public boolean hasColocatedCacheEntries() {
+        return entries.size() > nearEntries;
     }
 
     /**
-     * @param near {@code True} if mapping is for near caches, {@code false} otherwise.
+     * @return {@code True} if has near cache entries.
      */
-    public void near(boolean near) {
-        this.near = near;
+    public boolean hasNearCacheEntries() {
+        return nearEntries > 0;
     }
 
     /**
@@ -124,6 +124,15 @@ public class GridDistributedTxMapping {
     }
 
     /**
+     * @return Near cache entries.
+     */
+    @Nullable public Collection<IgniteTxEntry> nearCacheEntries() {
+        assert nearEntries > 0;
+
+        return F.view(entries, CU.FILTER_NEAR_CACHE_ENTRY);
+    }
+
+    /**
      * @return {@code True} if lock is explicit.
      */
     public boolean explicitLock() {
@@ -159,21 +168,58 @@ public class GridDistributedTxMapping {
      * @return Reads.
      */
     public Collection<IgniteTxEntry> reads() {
-        return F.view(entries, CU.reads());
+        return F.view(entries, CU.READ_FILTER);
     }
 
     /**
      * @return Writes.
      */
     public Collection<IgniteTxEntry> writes() {
-        return F.view(entries, CU.writes());
+        return F.view(entries, CU.WRITE_FILTER);
+    }
+
+    /**
+     * @return Near cache reads.
+     */
+    public Collection<IgniteTxEntry> nearEntriesReads() {
+        assert hasNearCacheEntries();
+
+        return F.view(entries, CU.READ_FILTER_NEAR);
+    }
+
+    /**
+     * @return Near cache writes.
+     */
+    public Collection<IgniteTxEntry> nearEntriesWrites() {
+        assert hasNearCacheEntries();
+
+        return F.view(entries, CU.WRITE_FILTER_NEAR);
+    }
+
+    /**
+     * @return Colocated cache reads.
+     */
+    public Collection<IgniteTxEntry> colocatedEntriesReads() {
+        assert hasColocatedCacheEntries();
+
+        return F.view(entries, CU.READ_FILTER_COLOCATED);
+    }
+
+    /**
+     * @return Colocated cache writes.
+     */
+    public Collection<IgniteTxEntry> colocatedEntriesWrites() {
+        assert hasColocatedCacheEntries();
+
+        return F.view(entries, CU.WRITE_FILTER_COLOCATED);
     }
 
     /**
      * @param entry Adds entry.
      */
     public void add(IgniteTxEntry entry) {
-        entries.add(entry);
+        if (entries.add(entry) && entry.context().isNear())
+            nearEntries++;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index de8b29e..9cb04d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -387,7 +387,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         }
 
         try {
-            cctx.tm().prepareTx(this);
+            cctx.tm().prepareTx(this, null);
 
             if (pessimistic() || isSystemInvalidate())
                 state(PREPARED);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index b1c7e5b..26f08fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -399,7 +399,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                     addEntry(msgId, e);
             }
 
-            userPrepare();
+            userPrepare(null);
 
             // Make sure to add future before calling prepare on it.
             cctx.mvcc().addFuture(fut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 6e7b324..464df6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -920,6 +920,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             if (res.hasOwnedValue(ver.getKey()))
                 continue;
 
+            assert txEntry != null : ver;
+
             GridCacheContext cacheCtx = txEntry.context();
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index cbd9d23..6060729 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -55,7 +55,6 @@ import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteReducer;
 import org.jetbrains.annotations.Nullable;
 
@@ -199,7 +198,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             MiniFuture mini = miniFuture(res.miniId());
 
             if (mini != null)
-                mini.onResult(res);
+                mini.onResult(res, true);
         }
     }
 
@@ -339,11 +338,17 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         GridDhtTxMapping txMapping = new GridDhtTxMapping();
 
-        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+        Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
 
-        for (IgniteTxEntry write : writes)
+        boolean hasNearCache = false;
+
+        for (IgniteTxEntry write : writes) {
             map(write, topVer, mappings, txMapping, remap, topLocked);
 
+            if (write.context().isNear())
+                hasNearCache = true;
+        }
+
         for (IgniteTxEntry read : reads)
             map(read, topVer, mappings, txMapping, remap, topLocked);
 
@@ -363,12 +368,26 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         tx.transactionNodes(txMapping.transactionNodes());
 
-        checkOnePhase(txMapping);
+        if (!hasNearCache)
+            checkOnePhase(txMapping);
 
+        MiniFuture locNearEntriesFut = null;
+
+        // Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}.
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 
-            add(new MiniFuture(this, m, ++miniId));
+            MiniFuture fut = new MiniFuture(this, m, ++miniId);
+
+            add(fut);
+
+            if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
+                assert locNearEntriesFut == null;
+
+                locNearEntriesFut = fut;
+
+                add(new MiniFuture(this, m, ++miniId));
+            }
         }
 
         Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -383,7 +402,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
             MiniFuture fut = (MiniFuture)fut0;
 
-            IgniteCheckedException err = prepare(fut, txMapping);
+            IgniteCheckedException err = prepare(fut, txMapping.transactionNodes(), locNearEntriesFut);
 
             if (err != null) {
                 while (it.hasNext()) {
@@ -417,9 +436,13 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
     /**
      * @param fut Mini future.
+     * @param txNodes Tx nodes.
+     * @param locNearEntriesFut Local future for near cache entries prepare.
      * @return Prepare error if any.
      */
-    @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) {
+    @Nullable private IgniteCheckedException prepare(final MiniFuture fut,
+        Map<UUID, Collection<UUID>> txNodes,
+        @Nullable MiniFuture locNearEntriesFut) {
         GridDistributedTxMapping m = fut.mapping();
 
         final ClusterNode primary = m.primary();
@@ -434,36 +457,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             return err;
         }
 
-        GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
-            futId,
-            tx.topologyVersion(),
-            tx,
-            timeout,
-            m.reads(),
-            m.writes(),
-            m.near(),
-            txMapping.transactionNodes(),
-            m.last(),
-            tx.onePhaseCommit(),
-            tx.needReturnValue() && tx.implicit(),
-            tx.implicitSingle(),
-            m.explicitLock(),
-            tx.subjectId(),
-            tx.taskNameHash(),
-            m.clientFirst(),
-            tx.activeCachesDeploymentEnabled());
-
-        for (IgniteTxEntry txEntry : m.entries()) {
-            if (txEntry.op() == TRANSFORM)
-                req.addDhtVersion(txEntry.txKey(), null);
-        }
-
         // Must lock near entries separately.
-        if (m.near()) {
+        if (m.hasNearCacheEntries()) {
             try {
-                tx.optimisticLockEntries(m.entries());
-
-                tx.userPrepare();
+                cctx.tm().prepareTx(tx, m.nearCacheEntries());
             }
             catch (IgniteCheckedException e) {
                 fut.onResult(e);
@@ -472,27 +469,36 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             }
         }
 
-        req.miniId(fut.futureId());
-
-        // If this is the primary node for the keys.
         if (primary.isLocal()) {
-            IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
-                tx,
-                req);
-
-            prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
-                @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
-                    try {
-                        fut.onResult(prepFut.get());
-                    }
-                    catch (IgniteCheckedException e) {
-                        fut.onResult(e);
-                    }
-                }
-            });
+            if (locNearEntriesFut != null) {
+                boolean nearEntries = fut == locNearEntriesFut;
+
+                GridNearTxPrepareRequest req = createRequest(txNodes,
+                    fut,
+                    timeout,
+                    nearEntries ? m.nearEntriesReads() : m.colocatedEntriesReads(),
+                    nearEntries ? m.nearEntriesWrites() : m.colocatedEntriesWrites());
+
+                prepareLocal(req, fut, nearEntries);
+            }
+            else {
+                GridNearTxPrepareRequest req = createRequest(txNodes,
+                    fut,
+                    timeout,
+                    m.reads(),
+                    m.writes());
+
+                prepareLocal(req, fut, m.hasNearCacheEntries());
+            }
         }
         else {
             try {
+                GridNearTxPrepareRequest req = createRequest(txNodes,
+                    fut,
+                    timeout,
+                    m.reads(),
+                    m.writes());
+
                 cctx.io().send(primary, req, tx.ioPolicy());
             }
             catch (ClusterTopologyCheckedException e) {
@@ -513,16 +519,86 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /**
+     * @param txNodes Tx nodes.
+     * @param fut Future.
+     * @param timeout Timeout.
+     * @param reads Read entries.
+     * @param writes Write entries.
+     * @return Request.
+     */
+    private GridNearTxPrepareRequest createRequest(
+        Map<UUID, Collection<UUID>> txNodes,
+        MiniFuture fut,
+        long timeout,
+        Collection<IgniteTxEntry> reads,
+        Collection<IgniteTxEntry> writes) {
+        GridDistributedTxMapping m = fut.mapping();
+
+        GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+            futId,
+            tx.topologyVersion(),
+            tx,
+            timeout,
+            reads,
+            writes,
+            m.hasNearCacheEntries(),
+            txNodes,
+            m.last(),
+            tx.onePhaseCommit(),
+            tx.needReturnValue() && tx.implicit(),
+            tx.implicitSingle(),
+            m.explicitLock(),
+            tx.subjectId(),
+            tx.taskNameHash(),
+            m.clientFirst(),
+            tx.activeCachesDeploymentEnabled());
+
+        for (IgniteTxEntry txEntry : writes) {
+            if (txEntry.op() == TRANSFORM)
+                req.addDhtVersion(txEntry.txKey(), null);
+        }
+
+        req.miniId(fut.futureId());
+
+        return req;
+    }
+
+    /**
+     * @param req Request.
+     * @param fut Future.
+     * @param nearEntries {@code True} if prepare near cache entries.
+     */
+    private void prepareLocal(GridNearTxPrepareRequest req,
+        final MiniFuture fut,
+        final boolean nearEntries) {
+        IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ?
+            cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+            cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+        prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+            @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+                try {
+                    fut.onResult(prepFut.get(), nearEntries);
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onResult(e);
+                }
+            }
+        });
+    }
+
+    /**
      * @param entry Transaction entry.
      * @param topVer Topology version.
      * @param curMapping Current mapping.
+     * @param txMapping Mapping.
      * @param remap Remap flag.
      * @param topLocked Topology locked flag.
      */
     private void map(
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
-        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
+        Map<UUID, GridDistributedTxMapping> curMapping,
         GridDhtTxMapping txMapping,
         boolean remap,
         boolean topLocked
@@ -565,30 +641,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             }
         }
 
-        IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
-
-        GridDistributedTxMapping cur = curMapping.get(key);
+        GridDistributedTxMapping cur = curMapping.get(primary.id());
 
         if (cur == null) {
             cur = new GridDistributedTxMapping(primary);
 
-            curMapping.put(key, cur);
-
-            if (primary.isLocal()) {
-                if (entry.context().isNear())
-                    tx.nearLocallyMapped(true);
-                else if (entry.context().isColocated())
-                    tx.colocatedLocallyMapped(true);
-            }
-
-            // Initialize near flag right away.
-            cur.near(cacheCtx.isNear());
+            curMapping.put(primary.id(), cur);
 
             cur.clientFirst(!topLocked && cctx.kernalContext().clientNode());
 
             cur.last(true);
         }
 
+        if (primary.isLocal()) {
+            if (cacheCtx.isNear())
+                tx.nearLocallyMapped(true);
+            else if (cacheCtx.isColocated())
+                tx.colocatedLocallyMapped(true);
+        }
+
         cur.add(entry);
 
         if (entry.explicitVersion() != null) {
@@ -683,9 +754,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      *
      */
     private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
         /** Receive result flag updater. */
         private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
             AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
@@ -773,9 +841,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         /**
          * @param res Result callback.
+         * @param updateMapping Update mapping flag.
          */
         @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-        void onResult(final GridNearTxPrepareResponse res) {
+        void onResult(final GridNearTxPrepareResponse res, boolean updateMapping) {
             if (isDone())
                 return;
 
@@ -878,7 +947,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                             onDone(res);
                     }
                     else {
-                        parent.onPrepareResponse(m, res);
+                        parent.onPrepareResponse(m, res, updateMapping);
 
                         // Finish this mini future (need result only on client node).
                         onDone(parent.cctx.kernalContext().clientNode() ? res : null);
@@ -892,8 +961,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
          */
         private void remap(final GridNearTxPrepareResponse res) {
             parent.prepareOnTopology(true, new Runnable() {
-                @Override
-                public void run() {
+                @Override public void run() {
                     onDone(res);
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index bc47c13..f4ce1ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -345,6 +345,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     /**
      * @param write Write.
      * @param topLocked {@code True} if thread already acquired lock preventing topology change.
+     * @param remap Remap flag.
      */
     private void prepareSingle(IgniteTxEntry write, boolean topLocked, boolean remap) {
         write.clearEntryReadVersion();
@@ -382,7 +383,10 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
         tx.transactionNodes(txMapping.transactionNodes());
 
-        checkOnePhase(txMapping);
+        if (!write.context().isNear())
+            checkOnePhase(txMapping);
+
+        assert !(mapping.hasColocatedCacheEntries() && mapping.hasNearCacheEntries()) : mapping;
 
         proceedPrepare(mapping, null);
     }
@@ -390,6 +394,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     /**
      * @param writes Write entries.
      * @param topLocked {@code True} if thread already acquired lock preventing topology change.
+     * @param remap Remap flag.
      */
     private void prepare(
         Iterable<IgniteTxEntry> writes,
@@ -402,24 +407,37 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
         txMapping = new GridDhtTxMapping();
 
-        Map<UUID, GridDistributedTxMapping> map = new HashMap<>();
+        Map<Object, GridDistributedTxMapping> map = new HashMap<>();
 
         // Assign keys to primary nodes.
         GridDistributedTxMapping cur = null;
 
         Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
 
+        boolean hasNearCache = false;
+
         for (IgniteTxEntry write : writes) {
             write.clearEntryReadVersion();
 
             GridDistributedTxMapping updated = map(write, topVer, cur, topLocked, remap);
 
+            if (write.context().isNear())
+                hasNearCache = true;
+
             if (cur != updated) {
                 mappings.offer(updated);
 
                 updated.last(true);
 
-                GridDistributedTxMapping prev = map.put(updated.primary().id(), updated);
+                ClusterNode primary = updated.primary();
+
+                assert !primary.isLocal() || !cctx.kernalContext().clientNode();
+
+                // Minor optimization to not create MappingKey: on client node can not have mapping for local node.
+                Object key =  cctx.kernalContext().clientNode() ? primary.id() :
+                    new MappingKey(primary.id(), primary.isLocal() && updated.hasNearCacheEntries());
+
+                GridDistributedTxMapping prev = map.put(key, updated);
 
                 if (prev != null)
                     prev.last(false);
@@ -451,7 +469,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
         tx.transactionNodes(txMapping.transactionNodes());
 
-        checkOnePhase(txMapping);
+        if (!hasNearCache)
+            checkOnePhase(txMapping);
 
         proceedPrepare(mappings);
     }
@@ -497,7 +516,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     timeout,
                     null,
                     m.writes(),
-                    m.near(),
+                    m.hasNearCacheEntries(),
                     txMapping.transactionNodes(),
                     m.last(),
                     tx.onePhaseCommit(),
@@ -515,14 +534,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 }
 
                 // Must lock near entries separately.
-                if (m.near()) {
+                if (m.hasNearCacheEntries()) {
                     try {
-                        tx.optimisticLockEntries(req.writes());
-
-                        tx.userPrepare();
+                        cctx.tm().prepareTx(tx, m.nearCacheEntries());
                     }
                     catch (IgniteCheckedException e) {
                         onError(e, false);
+
+                        return;
                     }
                 }
 
@@ -532,13 +551,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
                 add(fut); // Append new future.
 
-                // If this is the primary node for the keys.
                 if (n.isLocal()) {
-                    // At this point, if any new node joined, then it is
-                    // waiting for this transaction to complete, so
-                    // partition reassignments are not possible here.
+                    assert !(m.hasColocatedCacheEntries() && m.hasNearCacheEntries()) : m;
+
                     IgniteInternalFuture<GridNearTxPrepareResponse> prepFut =
-                        cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+                        m.hasNearCacheEntries() ? cctx.tm().txHandler().prepareNearTx(n.id(), req, true)
+                        : cctx.tm().txHandler().prepareColocatedTx(tx, req);
 
                     prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
                         @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
@@ -590,6 +608,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
      * @param topVer Topology version.
      * @param cur Current mapping.
      * @param topLocked {@code True} if thread already acquired lock preventing topology change.
+     * @param remap Remap flag.
      * @return Mapping.
      */
     private GridDistributedTxMapping map(
@@ -644,14 +663,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             }
         }
 
-        if (cur == null || !cur.primary().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+        if (cur == null || !cur.primary().id().equals(primary.id()) ||
+            (primary.isLocal() && cur.hasNearCacheEntries() != cacheCtx.isNear())) {
             boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode();
 
             cur = new GridDistributedTxMapping(primary);
 
-            // Initialize near flag right away.
-            cur.near(cacheCtx.isNear());
-
             cur.clientFirst(clientFirst);
         }
 
@@ -908,7 +925,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                             remap();
                     }
                     else {
-                        parent.onPrepareResponse(m, res);
+                        parent.onPrepareResponse(m, res, m.hasNearCacheEntries());
 
                         // Proceed prepare before finishing mini future.
                         if (mappings != null)
@@ -937,4 +954,44 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
         }
     }
+
+    /**
+     *
+     */
+    private static class MappingKey {
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private final boolean nearEntries;
+
+        /**
+         * @param nodeId Node ID.
+         * @param nearEntries Near cache entries flag (should be true only for local node).
+         */
+        MappingKey(UUID nodeId, boolean nearEntries) {
+            this.nodeId = nodeId;
+            this.nearEntries = nearEntries;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
+        @Override public boolean equals(Object o) {
+            MappingKey that = (MappingKey) o;
+
+            return nearEntries == that.nearEntries && nodeId.equals(that.nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = nodeId.hashCode();
+            res = 31 * res + (nearEntries ? 1 : 0);
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MappingKey.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index cb15bca..e934319 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -102,7 +102,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             if (f != null) {
                 assert f.primary().id().equals(nodeId);
 
-                f.onResult(res);
+                f.onResult(res, true);
             }
             else {
                 if (msgLog.isDebugEnabled()) {
@@ -169,7 +169,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         }
 
         try {
-            tx.userPrepare();
+            tx.userPrepare(Collections.<IgniteTxEntry>emptyList());
 
             cctx.mvcc().addFuture(this);
 
@@ -181,20 +181,97 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     }
 
     /**
+     * @param txNodes Tx nodes.
+     * @param m Mapping.
+     * @param timeout Timeout.
+     * @param reads Reads.
+     * @param writes Writes.
+     * @return Request.
+     */
+    private GridNearTxPrepareRequest createRequest(Map<UUID, Collection<UUID>> txNodes,
+        GridDistributedTxMapping m,
+        long timeout,
+        Collection<IgniteTxEntry> reads,
+        Collection<IgniteTxEntry> writes) {
+        GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+            futId,
+            tx.topologyVersion(),
+            tx,
+            timeout,
+            reads,
+            writes,
+            m.hasNearCacheEntries(),
+            txNodes,
+            true,
+            tx.onePhaseCommit(),
+            tx.needReturnValue() && tx.implicit(),
+            tx.implicitSingle(),
+            m.explicitLock(),
+            tx.subjectId(),
+            tx.taskNameHash(),
+            false,
+            tx.activeCachesDeploymentEnabled());
+
+        for (IgniteTxEntry txEntry : writes) {
+            if (txEntry.op() == TRANSFORM)
+                req.addDhtVersion(txEntry.txKey(), null);
+        }
+
+        return req;
+    }
+
+    /**
+     * @param req Request.
+     * @param m Mapping.
+     * @param miniId Mini future ID.
+     * @param nearEntries {@code True} if prepare near cache entries.
+     */
+    private void prepareLocal(GridNearTxPrepareRequest req,
+        GridDistributedTxMapping m,
+        int miniId,
+        final boolean nearEntries) {
+        final MiniFuture fut = new MiniFuture(m, miniId);
+
+        req.miniId(fut.futureId());
+
+        add(fut);
+
+        IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ?
+            cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+            cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+        prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+            @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+                try {
+                    fut.onResult(prepFut.get(), nearEntries);
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onError(e);
+                }
+            }
+        });
+    }
+
+    /**
      *
      */
     private void preparePessimistic() {
-        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+        Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
 
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
         GridDhtTxMapping txMapping = new GridDhtTxMapping();
 
+        boolean hasNearCache = false;
+
         for (IgniteTxEntry txEntry : tx.allEntries()) {
             txEntry.clearEntryReadVersion();
 
             GridCacheContext cacheCtx = txEntry.context();
 
+            if (cacheCtx.isNear())
+                hasNearCache = true;
+
             List<ClusterNode> nodes;
 
             if (!cacheCtx.isLocal()) {
@@ -205,21 +282,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             else
                 nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer);
 
-            ClusterNode primary = F.first(nodes);
-
-            boolean near = cacheCtx.isNear();
+            assert !nodes.isEmpty();
 
-            IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near);
+            ClusterNode primary = nodes.get(0);
 
-            GridDistributedTxMapping nodeMapping = mappings.get(key);
+            GridDistributedTxMapping nodeMapping = mappings.get(primary.id());
 
-            if (nodeMapping == null) {
-                nodeMapping = new GridDistributedTxMapping(primary);
-
-                nodeMapping.near(cacheCtx.isNear());
-
-                mappings.put(key, nodeMapping);
-            }
+            if (nodeMapping == null)
+                mappings.put(primary.id(), nodeMapping = new GridDistributedTxMapping(primary));
 
             txEntry.nodeId(primary.id());
 
@@ -230,7 +300,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         tx.transactionNodes(txMapping.transactionNodes());
 
-        checkOnePhase(txMapping);
+        if (!hasNearCache)
+            checkOnePhase(txMapping);
 
         long timeout = tx.remainingTime();
 
@@ -242,56 +313,48 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         int miniId = 0;
 
+        Map<UUID, Collection<UUID>> txNodes = txMapping.transactionNodes();
+
         for (final GridDistributedTxMapping m : mappings.values()) {
             final ClusterNode primary = m.primary();
 
-            GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
-                futId,
-                tx.topologyVersion(),
-                tx,
-                timeout,
-                m.reads(),
-                m.writes(),
-                m.near(),
-                txMapping.transactionNodes(),
-                true,
-                tx.onePhaseCommit(),
-                tx.needReturnValue() && tx.implicit(),
-                tx.implicitSingle(),
-                m.explicitLock(),
-                tx.subjectId(),
-                tx.taskNameHash(),
-                false,
-                tx.activeCachesDeploymentEnabled());
-
-            for (IgniteTxEntry txEntry : m.entries()) {
-                if (txEntry.op() == TRANSFORM)
-                    req.addDhtVersion(txEntry.txKey(), null);
+            if (primary.isLocal()) {
+                if (m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
+                    GridNearTxPrepareRequest nearReq = createRequest(txMapping.transactionNodes(),
+                        m,
+                        timeout,
+                        m.nearEntriesReads(),
+                        m.nearEntriesWrites());
+
+                    prepareLocal(nearReq, m, ++miniId, true);
+
+                    GridNearTxPrepareRequest colocatedReq = createRequest(txNodes,
+                        m,
+                        timeout,
+                        m.colocatedEntriesReads(),
+                        m.colocatedEntriesWrites());
+
+                    prepareLocal(colocatedReq, m, ++miniId, false);
+                }
+                else {
+                    GridNearTxPrepareRequest req = createRequest(txNodes, m, timeout, m.reads(), m.writes());
+
+                    prepareLocal(req, m, ++miniId, m.hasNearCacheEntries());
+                }
             }
+            else {
+                GridNearTxPrepareRequest req = createRequest(txNodes,
+                    m,
+                    timeout,
+                    m.reads(),
+                    m.writes());
 
-            final MiniFuture fut = new MiniFuture(m, ++miniId);
+                final MiniFuture fut = new MiniFuture(m, ++miniId);
 
-            req.miniId(fut.futureId());
+                req.miniId(fut.futureId());
 
-            add(fut);
+                add(fut);
 
-            if (primary.isLocal()) {
-                IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
-                    tx,
-                    req);
-
-                prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
-                    @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
-                        try {
-                            fut.onResult(prepFut.get());
-                        }
-                        catch (IgniteCheckedException e) {
-                            fut.onError(e);
-                        }
-                    }
-                });
-            }
-            else {
                 try {
                     cctx.io().send(primary, req, tx.ioPolicy());
 
@@ -395,12 +458,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         /**
          * @param res Response.
+         * @param updateMapping Update mapping flag.
          */
-        void onResult(GridNearTxPrepareResponse res) {
+        void onResult(GridNearTxPrepareResponse res, boolean updateMapping) {
             if (res.error() != null)
                 onError(res.error());
             else {
-                onPrepareResponse(m, res);
+                onPrepareResponse(m, res, updateMapping);
 
                 onDone(res);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 37be0fb..89874ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -642,7 +642,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
      * @param mapping Mapping to finish.
      */
     private void readyNearMappingFromBackup(GridDistributedTxMapping mapping) {
-        if (mapping.near()) {
+        if (mapping.hasNearCacheEntries()) {
             GridCacheVersion xidVer = tx.xidVersion();
 
             mapping.dhtVersion(xidVer, xidVer);
@@ -676,7 +676,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
     private void finish(int miniId, GridDistributedTxMapping m, boolean commit) {
         ClusterNode n = m.primary();
 
-        assert !m.empty();
+        assert !m.empty() : m;
 
         CacheWriteSynchronizationMode syncMode = tx.syncMode();
 
@@ -698,7 +698,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             m.explicitLock(),
             tx.storeEnabled(),
             tx.topologyVersion(),
-            completedVer, // Reuse 'baseVersion'  to do not add new fields in message.
+            completedVer, // Reuse 'baseVersion' to do not add new fields in message.
             null,
             null,
             tx.size(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 62af536..f795ddc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -151,9 +151,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
     @GridToStringExclude
     private volatile GridNearTxFinishFuture rollbackFut;
 
-    /** Entries to lock on next step of prepare stage. */
-    private Collection<IgniteTxEntry> optimisticLockEntries = Collections.emptyList();
-
     /** True if transaction contains near cache entries mapped to local node. */
     private boolean nearLocallyMapped;
 
@@ -2425,14 +2422,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
 
     /** {@inheritDoc} */
     @Override public Collection<IgniteTxEntry> optimisticLockEntries() {
-        return optimisticLockEntries;
-    }
+        assert false : "Should not be called";
 
-    /**
-     * @param optimisticLockEntries Optimistic lock entries.
-     */
-    void optimisticLockEntries(Collection<IgniteTxEntry> optimisticLockEntries) {
-        this.optimisticLockEntries = optimisticLockEntries;
+        throw new UnsupportedOperationException();
     }
 
     /**
@@ -2862,8 +2854,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                 if (m == null) {
                     mappings.put(m = new GridDistributedTxMapping(primary));
 
-                    m.near(map.near());
-
                     if (map.explicitLock())
                         m.markExplicitLock();
                 }
@@ -2889,8 +2879,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
 
         mappings.put(m);
 
-        m.near(map.near());
-
         if (map.explicitLock())
             m.markExplicitLock();
 
@@ -2933,14 +2921,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers)
     {
+        assert mapping.hasNearCacheEntries() : mapping;
+
         // Process writes, then reads.
         for (IgniteTxEntry txEntry : mapping.entries()) {
-            if (CU.writes().apply(txEntry))
+            if (CU.WRITE_FILTER_NEAR.apply(txEntry))
                 readyNearLock(txEntry, mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers);
         }
 
         for (IgniteTxEntry txEntry : mapping.entries()) {
-            if (CU.reads().apply(txEntry))
+            if (CU.READ_FILTER_NEAR.apply(txEntry))
                 readyNearLock(txEntry, mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers);
         }
     }
@@ -2952,7 +2942,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
      */
-    void readyNearLock(IgniteTxEntry txEntry,
+    private void readyNearLock(IgniteTxEntry txEntry,
         GridCacheVersion dhtVer,
         Collection<GridCacheVersion> pendingVers,
         Collection<GridCacheVersion> committedVers,
@@ -3333,11 +3323,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
             needReturnValue() && implicit());
 
         try {
-            // At this point all the entries passed in must be enlisted in transaction because this is an
-            // optimistic transaction.
-            optimisticLockEntries = (serializable() && optimistic()) ? F.concat(false, writes, reads) : writes;
-
-            userPrepare();
+            userPrepare((serializable() && optimistic()) ? F.concat(false, writes, reads) : writes);
 
             // Make sure to add future before calling prepare on it.
             cctx.mvcc().addFuture(fut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 7f1f5a2..004e4da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -172,7 +172,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
 
             Collection<UUID> backups = entry.getValue();
 
-            if (backups.size() <= 1)
+            if (backups.size() <= 1 && !tx.txState().hasNearCacheConfigured(cctx, tx.topologyVersion()))
                 tx.onePhaseCommit(true);
         }
     }
@@ -180,9 +180,12 @@ public abstract class GridNearTxPrepareFutureAdapter extends
     /**
      * @param m Mapping.
      * @param res Response.
+     * @param updateMapping Update mapping flag.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+    final void onPrepareResponse(GridDistributedTxMapping m,
+        GridNearTxPrepareResponse res,
+        boolean updateMapping) {
         if (res == null)
             return;
 
@@ -245,24 +248,25 @@ public abstract class GridNearTxPrepareFutureAdapter extends
         }
 
         if (!m.empty()) {
-            GridCacheVersion writeVer = res.writeVersion();
-
-            if (writeVer == null)
-                writeVer = res.dhtVersion();
-
             // This step is very important as near and DHT versions grow separately.
             cctx.versions().onReceived(nodeId, res.dhtVersion());
 
-            // Register DHT version.
-            m.dhtVersion(res.dhtVersion(), writeVer);
+            if (updateMapping && m.hasNearCacheEntries()) {
+                GridCacheVersion writeVer = res.writeVersion();
+
+                if (writeVer == null)
+                    writeVer = res.dhtVersion();
 
-            GridDistributedTxMapping map = tx.mappings().get(nodeId);
+                // Register DHT version.
+                m.dhtVersion(res.dhtVersion(), writeVer);
 
-            if (map != null)
-                map.dhtVersion(res.dhtVersion(), writeVer);
+                GridDistributedTxMapping map = tx.mappings().get(nodeId);
+
+                if (map != null)
+                    map.dhtVersion(res.dhtVersion(), writeVer);
 
-            if (m.near())
                 tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 153ad04..a591517 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -111,20 +111,17 @@ public class IgniteTxHandler {
     /**
      * @param nearNodeId Node ID.
      * @param req Request.
-     * @return Prepare future.
      */
-    private IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
+    private void processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
                 ", node=" + nearNodeId + ']');
         }
 
-        IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareTx(nearNodeId, null, req);
+        IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareNearTx(nearNodeId, req, false);
 
         assert req.txState() != null || fut == null || fut.error() != null ||
             (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
-
-        return fut;
     }
 
     /**
@@ -209,41 +206,13 @@ public class IgniteTxHandler {
     }
 
     /**
-     * @param nearNodeId Near node ID that initiated transaction.
-     * @param locTx Optional local transaction.
-     * @param req Near prepare request.
-     * @return Future for transaction.
-     */
-    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareTx(
-        UUID nearNodeId,
-        @Nullable GridNearTxLocal locTx,
-        GridNearTxPrepareRequest req
-    ) {
-        assert nearNodeId != null;
-        assert req != null;
-
-        if (locTx != null) {
-            if (req.near()) {
-                // Make sure not to provide Near entries to DHT cache.
-                req.cloneEntries();
-
-                return prepareNearTx(nearNodeId, req);
-            }
-            else
-                return prepareColocatedTx(locTx, req);
-        }
-        else
-            return prepareNearTx(nearNodeId, req);
-    }
-
-    /**
      * Prepares local colocated tx.
      *
      * @param locTx Local transaction.
      * @param req Near prepare request.
      * @return Prepare future.
      */
-    private IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
         final GridNearTxLocal locTx,
         final GridNearTxPrepareRequest req
     ) {
@@ -308,16 +277,20 @@ public class IgniteTxHandler {
     }
 
     /**
-     * Prepares near transaction.
-     *
      * @param nearNodeId Near node ID that initiated transaction.
      * @param req Near prepare request.
+     * @param locReq Local request flag.
      * @return Prepare future.
      */
-    private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
         final UUID nearNodeId,
-        final GridNearTxPrepareRequest req
+        final GridNearTxPrepareRequest req,
+        boolean locReq
     ) {
+        // Make sure not to provide Near entries to DHT cache.
+        if (locReq)
+            req.cloneEntries();
+
         ClusterNode nearNode = ctx.node(nearNodeId);
 
         if (nearNode == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 5743bfb..36f5f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -293,6 +294,11 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
+        return cacheCtx != null ? ctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) : false;
+    }
+
+    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxImplicitSingleStateImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index a59ff51..5a708d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -225,7 +225,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteTxState txState() {
+    @Override public IgniteTxLocalState txState() {
         return txState;
     }
 
@@ -401,10 +401,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
+     * @param entries Entries to lock or {@code null} if use default {@link IgniteInternalTx#optimisticLockEntries()}.
      * @throws IgniteCheckedException If prepare step failed.
      */
     @SuppressWarnings({"CatchGenericClass"})
-    public void userPrepare() throws IgniteCheckedException {
+    public void userPrepare(@Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException {
         if (state() != PREPARING) {
             if (remainingTime() == -1)
                 throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
@@ -420,7 +421,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         checkValid();
 
         try {
-            cctx.tm().prepareTx(this);
+            cctx.tm().prepareTx(this, entries);
         }
         catch (IgniteCheckedException e) {
             throw e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
index 123d396..fe9fcbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+
 /**
  *
  */
@@ -41,4 +44,11 @@ public interface IgniteTxLocalState extends IgniteTxState {
      *
      */
     public void seal();
+
+    /**
+     * @param ctx Context.
+     * @param topVer Topology version.
+     * @return {@code True} if tx has cache with created near cache.
+     */
+    public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index da49c06..2da8dee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -774,12 +774,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Handles prepare stage of 2PC.
+     * Handles prepare stage.
      *
      * @param tx Transaction to prepare.
+     * @param entries Entries to lock or {@code null} if use default {@link IgniteInternalTx#optimisticLockEntries()}.
      * @throws IgniteCheckedException If preparation failed.
      */
-    public void prepareTx(IgniteInternalTx tx) throws IgniteCheckedException {
+    public void prepareTx(IgniteInternalTx tx, @Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException {
         if (tx.state() == MARKED_ROLLBACK) {
             if (tx.remainingTime() == -1)
                 throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
@@ -799,7 +800,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         // Optimistic.
         assert tx.optimistic() || !tx.local();
 
-        if (!lockMultiple(tx, tx.optimisticLockEntries())) {
+        if (!lockMultiple(tx, entries != null ? entries : tx.optimisticLockEntries())) {
             tx.setRollbackOnly();
 
             throw new IgniteTxOptimisticCheckedException("Failed to prepare transaction (lock conflict): " + tx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 7a45b6e..ed2526e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -85,7 +85,7 @@ public interface IgniteTxState {
     public boolean hasNearCache(GridCacheSharedContext cctx);
 
     /**
-     * @param cacheCtx Ccntext.
+     * @param cacheCtx Context.
      * @param tx Transaction.
      * @throws IgniteCheckedException If cache check failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 304473e..3679208 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -28,12 +28,14 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheInterceptor;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
-import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -52,7 +54,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
  */
 public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     /** Active cache IDs. */
-    private GridLongList activeCacheIds = new GridLongList();
+    private GridIntList activeCacheIds = new GridIntList();
 
     /** Per-transaction read map. */
     @GridToStringInclude
@@ -77,13 +79,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public Integer firstCacheId() {
-        return activeCacheIds.isEmpty() ? null : (int)activeCacheIds.get(0);
+        return activeCacheIds.isEmpty() ? null : activeCacheIds.get(0);
     }
 
     /** {@inheritDoc} */
     @Override public void unwindEvicts(GridCacheSharedContext cctx) {
         for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = (int) activeCacheIds.get(i);
+            int cacheId = activeCacheIds.get(i);
 
             GridCacheContext ctx = cctx.cacheContext(cacheId);
 
@@ -95,7 +97,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     /** {@inheritDoc} */
     @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
         if (activeCacheIds.size() == 1) {
-            int cacheId = (int)activeCacheIds.get(0);
+            int cacheId = activeCacheIds.get(0);
 
             return cctx.cacheContext(cacheId);
         }
@@ -106,7 +108,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     /** {@inheritDoc} */
     @Override public void awaitLastFut(GridCacheSharedContext cctx) {
         for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = (int)activeCacheIds.get(i);
+            int cacheId = activeCacheIds.get(i);
 
             cctx.cacheContext(cacheId).cache().awaitLastFut();
         }
@@ -157,7 +159,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
         }
 
         for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = (int)activeCacheIds.get(i);
+            int cacheId = activeCacheIds.get(i);
 
             GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
@@ -175,7 +177,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
         CacheWriteSynchronizationMode syncMode = CacheWriteSynchronizationMode.FULL_ASYNC;
 
         for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = (int)activeCacheIds.get(i);
+            int cacheId = activeCacheIds.get(i);
 
             CacheWriteSynchronizationMode cacheSyncMode =
                 cctx.cacheContext(cacheId).config().getWriteSynchronizationMode();
@@ -202,7 +204,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     /** {@inheritDoc} */
     @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
         for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = (int)activeCacheIds.get(i);
+            int cacheId = activeCacheIds.get(i);
 
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
@@ -236,7 +238,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
                 int idx = 0;
 
                 for (int i = 0; i < activeCacheIds.size(); i++) {
-                    int activeCacheId = (int)activeCacheIds.get(i);
+                    int activeCacheId = activeCacheIds.get(i);
 
                     cacheNames.append(cctx.cacheContext(activeCacheId).name());
 
@@ -267,7 +269,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
         GridCacheContext<?, ?> nonLocCtx = null;
 
         for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = (int)activeCacheIds.get(i);
+            int cacheId = activeCacheIds.get(i);
 
             GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
@@ -299,7 +301,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
             GridCacheContext<?, ?> nonLocCtx = null;
 
             for (int i = 0; i < activeCacheIds.size(); i++) {
-                int cacheId = (int)activeCacheIds.get(i);
+                int cacheId = activeCacheIds.get(i);
 
                 GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
@@ -319,7 +321,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
         if (!activeCacheIds.isEmpty()) {
             for (int i = 0; i < activeCacheIds.size(); i++) {
-                int cacheId = (int)activeCacheIds.get(i);
+                int cacheId = activeCacheIds.get(i);
 
                 CacheStoreManager store = cctx.cacheContext(cacheId).store();
 
@@ -334,7 +336,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     /** {@inheritDoc} */
     @Override public boolean hasInterceptor(GridCacheSharedContext cctx) {
         for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = (int)activeCacheIds.get(i);
+            int cacheId = activeCacheIds.get(i);
 
             CacheInterceptor interceptor = cctx.cacheContext(cacheId).config().getInterceptor();
 
@@ -347,13 +349,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
-        GridLongList cacheIds = activeCacheIds;
+        GridIntList cacheIds = activeCacheIds;
 
         if (!cacheIds.isEmpty()) {
             Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size());
 
             for (int i = 0; i < cacheIds.size(); i++) {
-                int cacheId = (int)cacheIds.get(i);
+                int cacheId = cacheIds.get(i);
 
                 CacheStoreManager store = cctx.cacheContext(cacheId).store();
 
@@ -370,7 +372,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     /** {@inheritDoc} */
     @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
         for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = (int)activeCacheIds.get(i);
+            int cacheId = activeCacheIds.get(i);
 
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
@@ -474,6 +476,22 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
+        DiscoCache discoCache = ctx.discovery().discoCache(topVer);
+
+        assert discoCache != null : topVer;
+
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = activeCacheIds.get(i);
+
+            if (discoCache.hasNearCache(cacheId))
+                return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxStateImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java
index b4785a7..ff0105b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
 import org.apache.ignite.internal.util.GridSerializableCollection;
 import org.apache.ignite.internal.util.lang.GridFunc;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.NotNull;
 
@@ -45,6 +44,7 @@ public class PredicateCollectionView<T> extends GridSerializableCollection<T> {
      * @param col Input col that serves as a base for the view.
      * @param preds Optional preds. If preds are not provided - all elements will be in the view.
      */
+    @SafeVarargs
     public PredicateCollectionView(Collection<T> col, IgnitePredicate<? super T>... preds) {
         this.col = col;
         this.preds = preds;
@@ -70,9 +70,4 @@ public class PredicateCollectionView<T> extends GridSerializableCollection<T> {
     @Override public boolean isEmpty() {
         return F.isEmpty(preds) ? col.isEmpty() : !iterator().hasNext();
     }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(PredicateCollectionView.class, this);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java
index d5b97a6..01e6d8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.util.GridSerializableMap;
 import org.apache.ignite.internal.util.GridSerializableSet;
 import org.apache.ignite.internal.util.lang.GridFunc;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -113,9 +112,4 @@ public class PredicateMapView<K, V> extends GridSerializableMap<K, V> {
     @Override public boolean containsKey(Object key) {
         return GridFunc.isAll((K)key, preds) && map.containsKey(key);
     }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(PredicateMapView.class, this);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java
index 99fc2fd..8937107 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.util.GridSerializableMap;
 import org.apache.ignite.internal.util.GridSerializableSet;
 import org.apache.ignite.internal.util.lang.GridFunc;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.NotNull;
@@ -145,9 +144,4 @@ public class PredicateSetView<K, V> extends GridSerializableMap<K, V> {
     @Override public boolean containsKey(Object key) {
         return GridFunc.isAll((K)key, preds) && set.contains(key);
     }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(PredicateSetView.class, this);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java
index 8186914..d8aa1d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
 import org.apache.ignite.internal.util.GridSerializableCollection;
 import org.apache.ignite.internal.util.GridSerializableIterator;
 import org.apache.ignite.internal.util.lang.GridFunc;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -87,9 +86,4 @@ public class ReadOnlyCollectionView<T> extends GridSerializableCollection<T> {
     @Override public boolean equals(Object obj) {
         return obj instanceof Collection && GridFunc.eqNotOrdered(this, (Collection)obj);
     }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ReadOnlyCollectionView.class, this);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java
index 82ec651..7a60e17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
 import org.apache.ignite.internal.util.GridSerializableCollection;
 import org.apache.ignite.internal.util.GridSerializableIterator;
 import org.apache.ignite.internal.util.lang.GridFunc;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -92,9 +91,4 @@ public class ReadOnlyCollectionView2X<T> extends GridSerializableCollection<T> {
     @Override public boolean equals(Object obj) {
         return obj instanceof Collection && GridFunc.eqNotOrdered(this, (Collection<?>)obj);
     }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ReadOnlyCollectionView2X.class, this);
-    }
 }