You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/17 15:15:02 UTC

ignite git commit: ignite-1561

Repository: ignite
Updated Branches:
  refs/heads/ignite-1561-1 045a1b29f -> 84df73e30


ignite-1561


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84df73e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84df73e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84df73e3

Branch: refs/heads/ignite-1561-1
Commit: 84df73e3080e58fd8f79eb0343ca2453b621c34f
Parents: 045a1b2
Author: sboikov <sb...@gridgain.com>
Authored: Mon Apr 17 13:59:18 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Apr 17 18:09:28 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheUtils.java        |  60 +++++-
 .../distributed/GridDistributedTxMapping.java   |  66 +++++-
 ...arOptimisticSerializableTxPrepareFuture.java | 213 +++++++++++++------
 .../near/GridNearOptimisticTxPrepareFuture.java | 110 ++++++----
 .../GridNearPessimisticTxPrepareFuture.java     | 165 +++++++++-----
 .../near/GridNearTxPrepareFutureAdapter.java    |  31 +--
 .../cache/transactions/IgniteTxHandler.java     |  48 +----
 .../lang/gridfunc/TransformCollectionView.java  |   5 -
 .../dht/IgniteCrossCacheTxSelfTest.java         |  12 +-
 9 files changed, 473 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d20a782..9ccc338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -247,9 +247,9 @@ public class GridCacheUtils {
     private static final CacheEntryPredicate[] ALWAYS_FALSE0_ARR = new CacheEntryPredicate[] {ALWAYS_FALSE0};
 
     /** Read filter. */
-    private static final IgnitePredicate READ_FILTER = new P1<Object>() {
-        @Override public boolean apply(Object e) {
-            return ((IgniteTxEntry)e).op() == READ;
+    public static final IgnitePredicate READ_FILTER = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() == READ;
         }
 
         @Override public String toString() {
@@ -257,10 +257,54 @@ public class GridCacheUtils {
         }
     };
 
+    /** Read filter. */
+    public static final IgnitePredicate READ_FILTER_NEAR = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() == READ && e.context().isNear();
+        }
+
+        @Override public String toString() {
+            return "Cache transaction read filter";
+        }
+    };
+
+    /** Read filter. */
+    public static final IgnitePredicate READ_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() == READ && !e.context().isNear();
+        }
+
+        @Override public String toString() {
+            return "Cache transaction read filter";
+        }
+    };
+
+    /** Write filter. */
+    public static final IgnitePredicate WRITE_FILTER = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() != READ;
+        }
+
+        @Override public String toString() {
+            return "Cache transaction write filter";
+        }
+    };
+
+    /** Write filter. */
+    public static final IgnitePredicate WRITE_FILTER_NEAR = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() != READ && e.context().isNear();
+        }
+
+        @Override public String toString() {
+            return "Cache transaction write filter";
+        }
+    };
+
     /** Write filter. */
-    private static final IgnitePredicate WRITE_FILTER = new P1<Object>() {
-        @Override public boolean apply(Object e) {
-            return ((IgniteTxEntry)e).op() != READ;
+    public static final IgnitePredicate WRITE_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
+        @Override public boolean apply(IgniteTxEntry e) {
+            return e.op() != READ && !e.context().isNear();
         }
 
         @Override public String toString() {
@@ -613,7 +657,7 @@ public class GridCacheUtils {
      * @return Filter for transaction reads.
      */
     @SuppressWarnings({"unchecked"})
-    public static <K, V> IgnitePredicate<IgniteTxEntry> reads() {
+    public static IgnitePredicate<IgniteTxEntry> reads() {
         return READ_FILTER;
     }
 
@@ -621,7 +665,7 @@ public class GridCacheUtils {
      * @return Filter for transaction writes.
      */
     @SuppressWarnings({"unchecked"})
-    public static <K, V> IgnitePredicate<IgniteTxEntry> writes() {
+    public static IgnitePredicate<IgniteTxEntry> writes() {
         return WRITE_FILTER;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 9d86244..a15c00a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -58,7 +58,7 @@ public class GridDistributedTxMapping {
     private boolean clientFirst;
 
     /** */
-    private boolean hasNear;
+    private int nearEntries;
 
     /**
      * @param primary Primary node.
@@ -97,8 +97,18 @@ public class GridDistributedTxMapping {
         this.clientFirst = clientFirst;
     }
 
+    /**
+     * @return {@code True} if has colocated cache entries.
+     */
+    public boolean hasColocatedCacheEntries() {
+        return entries.size() > nearEntries;
+    }
+
+    /**
+     * @return {@code True} if has near cache entries.
+     */
     public boolean hasNearCacheEntries() {
-        return hasNear;
+        return nearEntries > 0;
     }
 
     /**
@@ -119,7 +129,9 @@ public class GridDistributedTxMapping {
      * @return Near cache entries.
      */
     @Nullable public List<IgniteTxEntry> nearCacheEntries() {
-        assert hasNear;
+        assert nearEntries > 0;
+
+        // TODO IGNITE-1561.
 
         List<IgniteTxEntry> nearCacheEntries = new ArrayList<>();
 
@@ -167,24 +179,62 @@ public class GridDistributedTxMapping {
      * @return Reads.
      */
     public Collection<IgniteTxEntry> reads() {
-        return F.view(entries, CU.reads());
+        return F.view(entries, CU.READ_FILTER);
     }
 
     /**
      * @return Writes.
      */
     public Collection<IgniteTxEntry> writes() {
-        return F.view(entries, CU.writes());
+        return F.view(entries, CU.WRITE_FILTER);
+    }
+
+    /**
+     * @return Near cache reads.
+     */
+    public Collection<IgniteTxEntry> nearEntriesReads() {
+        assert hasNearCacheEntries();
+
+        return F.view(entries, CU.READ_FILTER_NEAR);
+    }
+
+    /**
+     * @return Near cache writes.
+     */
+    public Collection<IgniteTxEntry> nearEntriesWrites() {
+        assert hasNearCacheEntries();
+
+        return F.view(entries, CU.WRITE_FILTER_NEAR);
+    }
+
+    /**
+     * @return Colocated cache reads.
+     */
+    public Collection<IgniteTxEntry> colocatedEntriesReads() {
+        assert hasColocatedCacheEntries();
+
+        return F.view(entries, CU.READ_FILTER_COLOCATED);
+    }
+
+    /**
+     * @return Colocated cache writes.
+     */
+    public Collection<IgniteTxEntry> colocatedEntriesWrites() {
+        assert hasColocatedCacheEntries();
+
+        return F.view(entries, CU.WRITE_FILTER_COLOCATED);
     }
 
     /**
      * @param entry Adds entry.
      */
     public void add(IgniteTxEntry entry) {
-        if (entry.context().isNear())
-            hasNear = true;
+        boolean add = entries.add(entry);
+
+        assert add : entry;
 
-        entries.add(entry);
+        if (entry.context().isNear())
+            nearEntries++;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index a54c65d..1212155 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -199,7 +200,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             MiniFuture mini = miniFuture(res.miniId());
 
             if (mini != null)
-                mini.onResult(res);
+                mini.onResult(res, true);
         }
     }
 
@@ -339,14 +340,24 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         GridDhtTxMapping txMapping = new GridDhtTxMapping();
 
-        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+        Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
 
-        for (IgniteTxEntry write : writes)
+        boolean nearEntries = false;
+
+        for (IgniteTxEntry write : writes) {
             map(write, topVer, mappings, txMapping, remap, topLocked);
 
-        for (IgniteTxEntry read : reads)
+            if (write.context().isNear())
+                nearEntries = true;
+        }
+
+        for (IgniteTxEntry read : reads) {
             map(read, topVer, mappings, txMapping, remap, topLocked);
 
+            if (read.context().isNear())
+                nearEntries = true;
+        }
+
         if (keyLockFut != null)
             keyLockFut.onAllKeysAdded();
 
@@ -363,12 +374,26 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         tx.transactionNodes(txMapping.transactionNodes());
 
-        checkOnePhase(txMapping);
+        if (!nearEntries)
+            checkOnePhase(txMapping);
+
+        MiniFuture locNearOnlyFut = null;
 
+        // Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}.
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 
-            add(new MiniFuture(this, m, ++miniId));
+            MiniFuture fut = new MiniFuture(this, m, ++miniId);
+
+            add(fut);
+
+            if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
+                assert locNearOnlyFut == null;
+
+                locNearOnlyFut = fut;
+
+                add(new MiniFuture(this, m, ++miniId));
+            }
         }
 
         Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -383,7 +408,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
             MiniFuture fut = (MiniFuture)fut0;
 
-            IgniteCheckedException err = prepare(fut, txMapping);
+            IgniteCheckedException err = prepare(fut, txMapping, locNearOnlyFut);
 
             if (err != null) {
                 while (it.hasNext()) {
@@ -419,7 +444,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      * @param fut Mini future.
      * @return Prepare error if any.
      */
-    @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) {
+    @Nullable private IgniteCheckedException prepare(final MiniFuture fut,
+        GridDhtTxMapping txMapping,
+        @Nullable MiniFuture locNearOnlyFut) {
         GridDistributedTxMapping m = fut.mapping();
 
         final ClusterNode primary = m.primary();
@@ -434,36 +461,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             return err;
         }
 
-        GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
-            futId,
-            tx.topologyVersion(),
-            tx,
-            timeout,
-            m.reads(),
-            m.writes(),
-            m.hasNearCacheEntries(),
-            txMapping.transactionNodes(),
-            m.last(),
-            tx.onePhaseCommit(),
-            tx.needReturnValue() && tx.implicit(),
-            tx.implicitSingle(),
-            m.explicitLock(),
-            tx.subjectId(),
-            tx.taskNameHash(),
-            m.clientFirst(),
-            tx.activeCachesDeploymentEnabled());
-
-        for (IgniteTxEntry txEntry : m.entries()) {
-            if (txEntry.op() == TRANSFORM)
-                req.addDhtVersion(txEntry.txKey(), null);
-        }
-
         // Must lock near entries separately.
         if (m.hasNearCacheEntries()) {
             try {
                 tx.optimisticLockEntries(m.nearCacheEntries());
 
-                tx.userPrepare();
+                cctx.tm().prepareTx(tx);
             }
             catch (IgniteCheckedException e) {
                 fut.onResult(e);
@@ -472,27 +475,37 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             }
         }
 
-        req.miniId(fut.futureId());
-
         // If this is the primary node for the keys.
         if (primary.isLocal()) {
-            IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
-                tx,
-                req);
-
-            prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
-                @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
-                    try {
-                        fut.onResult(prepFut.get());
-                    }
-                    catch (IgniteCheckedException e) {
-                        fut.onResult(e);
-                    }
-                }
-            });
+            if (locNearOnlyFut != null) {
+                boolean nearOnly = fut == locNearOnlyFut;
+
+                GridNearTxPrepareRequest req = createRequest(txMapping.transactionNodes(),
+                    fut,
+                    timeout,
+                    nearOnly ? m.nearEntriesReads() : m.colocatedEntriesReads(),
+                    nearOnly ? m.nearEntriesWrites() : m.colocatedEntriesWrites());
+
+                prepareLocal(req, fut, nearOnly, nearOnly);
+            }
+            else {
+                GridNearTxPrepareRequest req = createRequest(txMapping.transactionNodes(),
+                    fut,
+                    timeout,
+                    m.reads(),
+                    m.writes());
+
+                prepareLocal(req, fut, m.hasNearCacheEntries(), true);
+            }
         }
         else {
             try {
+                GridNearTxPrepareRequest req = createRequest(txMapping.transactionNodes(),
+                    fut,
+                    timeout,
+                    m.reads(),
+                    m.writes());
+
                 cctx.io().send(primary, req, tx.ioPolicy());
             }
             catch (ClusterTopologyCheckedException e) {
@@ -513,6 +526,79 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /**
+     * @param txNodes
+     * @param fut
+     * @param timeout
+     * @param reads
+     * @param writes
+     * @return Request.
+     */
+    private GridNearTxPrepareRequest createRequest(
+        Map<UUID, Collection<UUID>> txNodes,
+        MiniFuture fut,
+        long timeout,
+        Collection<IgniteTxEntry> reads,
+        Collection<IgniteTxEntry> writes) {
+        GridDistributedTxMapping m = fut.mapping();
+
+        GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+            futId,
+            tx.topologyVersion(),
+            tx,
+            timeout,
+            reads,
+            writes,
+            m.hasNearCacheEntries(),
+            txNodes,
+            m.last(),
+            tx.onePhaseCommit(),
+            tx.needReturnValue() && tx.implicit(),
+            tx.implicitSingle(),
+            m.explicitLock(),
+            tx.subjectId(),
+            tx.taskNameHash(),
+            m.clientFirst(),
+            tx.activeCachesDeploymentEnabled());
+
+        for (IgniteTxEntry txEntry : m.entries()) {
+            if (txEntry.op() == TRANSFORM)
+                req.addDhtVersion(txEntry.txKey(), null);
+        }
+
+        req.miniId(fut.futureId());
+
+        return req;
+    }
+
+    /**
+     * @param req Request.
+     * @param nearTx Near cache mapping flag.
+     * @param updateMapping Update mapping flag.
+     */
+    private void prepareLocal(GridNearTxPrepareRequest req,
+        final MiniFuture fut,
+        final boolean nearTx,
+        final boolean updateMapping) {
+        if (nearTx)
+            req.cloneEntries();
+
+        IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearTx ?
+            cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+            cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+        prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+            @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+                try {
+                    fut.onResult(prepFut.get(), updateMapping);
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onResult(e);
+                }
+            }
+        });
+    }
+
+    /**
      * @param entry Transaction entry.
      * @param topVer Topology version.
      * @param curMapping Current mapping.
@@ -522,7 +608,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     private void map(
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
-        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
+        Map<UUID, GridDistributedTxMapping> curMapping,
         GridDhtTxMapping txMapping,
         boolean remap,
         boolean topLocked
@@ -565,27 +651,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             }
         }
 
-        IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
-
-        GridDistributedTxMapping cur = curMapping.get(key);
+        GridDistributedTxMapping cur = curMapping.get(primary.id());
 
         if (cur == null) {
             cur = new GridDistributedTxMapping(primary);
 
-            curMapping.put(key, cur);
-
-            if (primary.isLocal()) {
-                if (entry.context().isNear())
-                    tx.nearLocallyMapped(true);
-                else if (entry.context().isColocated())
-                    tx.colocatedLocallyMapped(true);
-            }
+            curMapping.put(primary.id(), cur);
 
             cur.clientFirst(!topLocked && cctx.kernalContext().clientNode());
 
             cur.last(true);
         }
 
+        if (primary.isLocal()) {
+            if (cacheCtx.isNear())
+                tx.nearLocallyMapped(true);
+            else if (cacheCtx.isColocated())
+                tx.colocatedLocallyMapped(true);
+        }
+
         cur.add(entry);
 
         if (entry.explicitVersion() != null) {
@@ -772,7 +856,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
          * @param res Result callback.
          */
         @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-        void onResult(final GridNearTxPrepareResponse res) {
+        void onResult(final GridNearTxPrepareResponse res, boolean updateMapping) {
             if (isDone())
                 return;
 
@@ -875,7 +959,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                             onDone(res);
                     }
                     else {
-                        parent.onPrepareResponse(m, res);
+                        parent.onPrepareResponse(m, res, updateMapping);
 
                         // Finish this mini future (need result only on client node).
                         onDone(parent.cctx.kernalContext().clientNode() ? res : null);
@@ -889,8 +973,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
          */
         private void remap(final GridNearTxPrepareResponse res) {
             parent.prepareOnTopology(true, new Runnable() {
-                @Override
-                public void run() {
+                @Override public void run() {
                     onDone(res);
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 26af91a..fc4d8c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -180,7 +180,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             if (mini != null) {
                 assert mini.node().id().equals(nodeId);
 
-                mini.onResult(res);
+                mini.onResult(res, true);
             }
             else {
                 if (msgLog.isDebugEnabled()) {
@@ -477,6 +477,60 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
         proceedPrepare(m, mappings);
     }
 
+    private void prepareLocal(GridNearTxPrepareRequest req,
+        final MiniFuture fut,
+        boolean nearTx,
+        final boolean updateMapping) {
+        IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearTx ?
+            cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+            cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+        prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+            @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+                try {
+                    fut.onResult(prepFut.get(), updateMapping);
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onResult(e);
+                }
+            }
+        });
+    }
+
+    private GridNearTxPrepareRequest createRequest(long timeout,
+        MiniFuture fut,
+        Collection<IgniteTxEntry> writes) {
+        GridDistributedTxMapping m = fut.mapping();
+
+        GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+            futId,
+            tx.topologyVersion(),
+            tx,
+            timeout,
+            null,
+            writes,
+            m.hasNearCacheEntries(),
+            txMapping.transactionNodes(),
+            m.last(),
+            tx.onePhaseCommit(),
+            tx.needReturnValue() && tx.implicit(),
+            tx.implicitSingle(),
+            m.explicitLock(),
+            tx.subjectId(),
+            tx.taskNameHash(),
+            m.clientFirst(),
+            tx.activeCachesDeploymentEnabled());
+
+        for (IgniteTxEntry txEntry : m.entries()) {
+            if (txEntry.op() == TRANSFORM)
+                req.addDhtVersion(txEntry.txKey(), null);
+        }
+
+        req.miniId(fut.futureId());
+
+        return req;
+    }
+
     /**
      * Continues prepare after previous mapping successfully finished.
      *
@@ -497,36 +551,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             long timeout = tx.remainingTime();
 
             if (timeout != -1) {
-                GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
-                    futId,
-                    tx.topologyVersion(),
-                    tx,
-                    timeout,
-                    null,
-                    m.writes(),
-                    m.hasNearCacheEntries(),
-                    txMapping.transactionNodes(),
-                    m.last(),
-                    tx.onePhaseCommit(),
-                    tx.needReturnValue() && tx.implicit(),
-                    tx.implicitSingle(),
-                    m.explicitLock(),
-                    tx.subjectId(),
-                    tx.taskNameHash(),
-                    m.clientFirst(),
-                    tx.activeCachesDeploymentEnabled());
-
-                for (IgniteTxEntry txEntry : m.entries()) {
-                    if (txEntry.op() == TRANSFORM)
-                        req.addDhtVersion(txEntry.txKey(), null);
-                }
-
                 // Must lock near entries separately.
                 if (m.hasNearCacheEntries()) {
                     try {
                         tx.optimisticLockEntries(m.nearCacheEntries());
 
-                        tx.userPrepare();
+                        cctx.tm().prepareTx(tx);
                     }
                     catch (IgniteCheckedException e) {
                         onError(e, false);
@@ -535,31 +565,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
                 final MiniFuture fut = new MiniFuture(this, m, ++miniId, mappings);
 
-                req.miniId(fut.futureId());
-
                 add(fut); // Append new future.
 
-                // If this is the primary node for the keys.
                 if (n.isLocal()) {
-                    // At this point, if any new node joined, then it is
-                    // waiting for this transaction to complete, so
-                    // partition reassignments are not possible here.
-                    IgniteInternalFuture<GridNearTxPrepareResponse> prepFut =
-                        cctx.tm().txHandler().prepareTx(n.id(), tx, req);
-
-                    prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
-                        @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
-                            try {
-                                fut.onResult(prepFut.get());
-                            }
-                            catch (IgniteCheckedException e) {
-                                fut.onResult(e);
-                            }
-                        }
-                    });
+                    GridNearTxPrepareRequest req = createRequest(timeout, fut, m.writes());
+
+                    prepareLocal(req, fut, m.hasNearCacheEntries(), true);
                 }
                 else {
                     try {
+                        GridNearTxPrepareRequest req = createRequest(timeout, fut, m.writes());
+
                         cctx.io().send(n, req, tx.ioPolicy());
 
                         if (msgLog.isDebugEnabled()) {
@@ -870,7 +886,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
          * @param res Result callback.
          */
         @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-        void onResult(final GridNearTxPrepareResponse res) {
+        void onResult(final GridNearTxPrepareResponse res, boolean updateMapping) {
             if (isDone())
                 return;
 
@@ -912,7 +928,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                             remap();
                     }
                     else {
-                        parent.onPrepareResponse(m, res);
+                        parent.onPrepareResponse(m, res, updateMapping);
 
                         // Proceed prepare before finishing mini future.
                         if (mappings != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index f6e7d2a..ee5790f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -102,7 +101,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             if (f != null) {
                 assert f.primary().id().equals(nodeId);
 
-                f.onResult(res);
+                f.onResult(res, true);
             }
             else {
                 if (msgLog.isDebugEnabled()) {
@@ -181,6 +180,80 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     }
 
     /**
+     * @param txNodes Tx nodes.
+     * @param m Mapping.
+     * @param timeout Timeout.
+     * @param reads Reads.
+     * @param writes Writes.
+     * @return Request.
+     */
+    private GridNearTxPrepareRequest createRequest(Map<UUID, Collection<UUID>> txNodes,
+        GridDistributedTxMapping m,
+        long timeout,
+        Collection<IgniteTxEntry> reads,
+        Collection<IgniteTxEntry> writes) {
+        GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+            futId,
+            tx.topologyVersion(),
+            tx,
+            timeout,
+            reads,
+            writes,
+            m.hasNearCacheEntries(),
+            txNodes,
+            true,
+            tx.onePhaseCommit(),
+            tx.needReturnValue() && tx.implicit(),
+            tx.implicitSingle(),
+            m.explicitLock(),
+            tx.subjectId(),
+            tx.taskNameHash(),
+            false,
+            tx.activeCachesDeploymentEnabled());
+
+        for (IgniteTxEntry txEntry : m.entries()) {
+            if (txEntry.op() == TRANSFORM)
+                req.addDhtVersion(txEntry.txKey(), null);
+        }
+
+        return req;
+    }
+
+    /**
+     * @param req Request.
+     * @param m Mapping.
+     * @param miniId Mini future ID.
+     * @param nearTx Near cache mapping flag.
+     * @param updateMapping Update mapping flag.
+     */
+    private void prepareLocal(GridNearTxPrepareRequest req,
+        GridDistributedTxMapping m,
+        int miniId,
+        final boolean nearTx,
+        final boolean updateMapping) {
+        final MiniFuture fut = new MiniFuture(m, miniId);
+
+        req.miniId(fut.futureId());
+
+        add(fut);
+
+        IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearTx ?
+            cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+            cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+        prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+            @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+                try {
+                    fut.onResult(prepFut.get(), updateMapping);
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onError(e);
+                }
+            }
+        });
+    }
+
+    /**
      *
      */
     private void preparePessimistic() {
@@ -216,11 +289,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
             GridDistributedTxMapping nodeMapping = mappings.get(primary.id());
 
-            if (nodeMapping == null) {
-                nodeMapping = new GridDistributedTxMapping(primary);
-
-                mappings.put(primary.id(), nodeMapping);
-            }
+            if (nodeMapping == null)
+                mappings.put(primary.id(), nodeMapping = new GridDistributedTxMapping(primary));
 
             txEntry.nodeId(primary.id());
 
@@ -244,56 +314,48 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         int miniId = 0;
 
+        Map<UUID, Collection<UUID>> txNodes = txMapping.transactionNodes();
+
         for (final GridDistributedTxMapping m : mappings.values()) {
             final ClusterNode primary = m.primary();
 
-            GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
-                futId,
-                tx.topologyVersion(),
-                tx,
-                timeout,
-                m.reads(),
-                m.writes(),
-                m.hasNearCacheEntries(),
-                txMapping.transactionNodes(),
-                true,
-                tx.onePhaseCommit(),
-                tx.needReturnValue() && tx.implicit(),
-                tx.implicitSingle(),
-                m.explicitLock(),
-                tx.subjectId(),
-                tx.taskNameHash(),
-                false,
-                tx.activeCachesDeploymentEnabled());
-
-            for (IgniteTxEntry txEntry : m.entries()) {
-                if (txEntry.op() == TRANSFORM)
-                    req.addDhtVersion(txEntry.txKey(), null);
+            if (primary.isLocal()) {
+                if (m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
+                    GridNearTxPrepareRequest nearReq = createRequest(txMapping.transactionNodes(),
+                        m,
+                        timeout,
+                        m.nearEntriesReads(),
+                        m.nearEntriesWrites());
+
+                    prepareLocal(nearReq, m, ++miniId, true, true);
+
+                    GridNearTxPrepareRequest colocatedReq = createRequest(txNodes,
+                        m,
+                        timeout,
+                        m.colocatedEntriesReads(),
+                        m.colocatedEntriesWrites());
+
+                    prepareLocal(colocatedReq, m, ++miniId, false, false);
+                }
+                else {
+                    GridNearTxPrepareRequest req = createRequest(txNodes, m, timeout, m.reads(), m.writes());
+
+                    prepareLocal(req, m, ++miniId, m.hasNearCacheEntries(), true);
+                }
             }
+            else {
+                GridNearTxPrepareRequest req = createRequest(txNodes,
+                    m,
+                    timeout,
+                    m.reads(),
+                    m.writes());
 
-            final MiniFuture fut = new MiniFuture(m, ++miniId);
+                final MiniFuture fut = new MiniFuture(m, ++miniId);
 
-            req.miniId(fut.futureId());
+                req.miniId(fut.futureId());
 
-            add(fut);
+                add(fut);
 
-            if (primary.isLocal()) {
-                IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
-                    tx,
-                    req);
-
-                prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
-                    @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
-                        try {
-                            fut.onResult(prepFut.get());
-                        }
-                        catch (IgniteCheckedException e) {
-                            fut.onError(e);
-                        }
-                    }
-                });
-            }
-            else {
                 try {
                     cctx.io().send(primary, req, tx.ioPolicy());
 
@@ -397,12 +459,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         /**
          * @param res Response.
+         * @param updateMapping Update mapping flag.
          */
-        void onResult(GridNearTxPrepareResponse res) {
+        void onResult(GridNearTxPrepareResponse res, boolean updateMapping) {
             if (res.error() != null)
                 onError(res.error());
             else {
-                onPrepareResponse(m, res);
+                onPrepareResponse(m, res, updateMapping);
 
                 onDone(res);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index f35324a..a9675d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -180,9 +180,12 @@ public abstract class GridNearTxPrepareFutureAdapter extends
     /**
      * @param m Mapping.
      * @param res Response.
+     * @param updateMapping Update mapping flag.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+    final void onPrepareResponse(GridDistributedTxMapping m,
+        GridNearTxPrepareResponse res,
+        boolean updateMapping) {
         if (res == null)
             return;
 
@@ -245,24 +248,26 @@ public abstract class GridNearTxPrepareFutureAdapter extends
         }
 
         if (!m.empty()) {
-            GridCacheVersion writeVer = res.writeVersion();
-
-            if (writeVer == null)
-                writeVer = res.dhtVersion();
-
             // This step is very important as near and DHT versions grow separately.
             cctx.versions().onReceived(nodeId, res.dhtVersion());
 
-            // Register DHT version.
-            m.dhtVersion(res.dhtVersion(), writeVer);
+            if (updateMapping) {
+                GridCacheVersion writeVer = res.writeVersion();
+
+                if (writeVer == null)
+                    writeVer = res.dhtVersion();
 
-            GridDistributedTxMapping map = tx.mappings().get(nodeId);
+                // Register DHT version.
+                m.dhtVersion(res.dhtVersion(), writeVer);
 
-            if (map != null)
-                map.dhtVersion(res.dhtVersion(), writeVer);
+                GridDistributedTxMapping map = tx.mappings().get(nodeId);
 
-            if (m.hasNearCacheEntries())
-                tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
+                if (map != null)
+                    map.dhtVersion(res.dhtVersion(), writeVer);
+
+                if (m.hasNearCacheEntries())
+                    tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 153ad04..00a991e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -111,20 +111,17 @@ public class IgniteTxHandler {
     /**
      * @param nearNodeId Node ID.
      * @param req Request.
-     * @return Prepare future.
      */
-    private IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
+    private void processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
                 ", node=" + nearNodeId + ']');
         }
 
-        IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareTx(nearNodeId, null, req);
+        IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareNearTx(nearNodeId, req, false);
 
         assert req.txState() != null || fut == null || fut.error() != null ||
             (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
-
-        return fut;
     }
 
     /**
@@ -209,41 +206,13 @@ public class IgniteTxHandler {
     }
 
     /**
-     * @param nearNodeId Near node ID that initiated transaction.
-     * @param locTx Optional local transaction.
-     * @param req Near prepare request.
-     * @return Future for transaction.
-     */
-    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareTx(
-        UUID nearNodeId,
-        @Nullable GridNearTxLocal locTx,
-        GridNearTxPrepareRequest req
-    ) {
-        assert nearNodeId != null;
-        assert req != null;
-
-        if (locTx != null) {
-            if (req.near()) {
-                // Make sure not to provide Near entries to DHT cache.
-                req.cloneEntries();
-
-                return prepareNearTx(nearNodeId, req);
-            }
-            else
-                return prepareColocatedTx(locTx, req);
-        }
-        else
-            return prepareNearTx(nearNodeId, req);
-    }
-
-    /**
      * Prepares local colocated tx.
      *
      * @param locTx Local transaction.
      * @param req Near prepare request.
      * @return Prepare future.
      */
-    private IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
         final GridNearTxLocal locTx,
         final GridNearTxPrepareRequest req
     ) {
@@ -308,16 +277,19 @@ public class IgniteTxHandler {
     }
 
     /**
-     * Prepares near transaction.
-     *
      * @param nearNodeId Near node ID that initiated transaction.
      * @param req Near prepare request.
+     * @param locReq Local request flag.
      * @return Prepare future.
      */
-    private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
         final UUID nearNodeId,
-        final GridNearTxPrepareRequest req
+        final GridNearTxPrepareRequest req,
+        boolean locReq
     ) {
+        if (locReq)
+            req.cloneEntries();
+
         ClusterNode nearNode = ctx.node(nearNodeId);
 
         if (nearNode == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java
index 2c317df..735f2d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java
@@ -71,9 +71,4 @@ public class TransformCollectionView<T1, T2> extends GridSerializableCollection<
     @Override public boolean isEmpty() {
         return F.isEmpty(preds) ? col.isEmpty() : !iterator().hasNext();
     }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(TransformCollectionView.class, this);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
index 273f0ca..1cac85a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
@@ -40,6 +40,7 @@ import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 
 /**
  * Tests specific combinations of cross-cache transactions.
@@ -70,14 +71,14 @@ public class IgniteCrossCacheTxSelfTest extends GridCommonAbstractTest {
      * @return Node count for this test.
      */
     private int nodeCount() {
-        return 2;
+        return 4;
     }
 
     /**
      * @return {@code True} if near cache should be enabled.
      */
     protected boolean nearEnabled() {
-        return false;
+        return true;
     }
 
     /** {@inheritDoc} */
@@ -137,6 +138,13 @@ public class IgniteCrossCacheTxSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticSerializable() throws Exception {
+        checkTxsSingleOp(OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
      * @param concurrency Concurrency.
      * @param isolation Isolation.
      * @throws Exception If failed.