You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/21 13:30:13 UTC

incubator-ignite git commit: # ignite-23 remap for tx updates from client

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-23 12761e440 -> d12dd4173


# ignite-23 remap for tx updates from client


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d12dd417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d12dd417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d12dd417

Branch: refs/heads/ignite-23
Commit: d12dd41736ead57d0f68aa211edde0b4922733fc
Parents: 12761e4
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 21 11:54:43 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 21 14:29:58 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |   5 +-
 .../GridCachePartitionExchangeManager.java      |  44 ++--
 .../distributed/GridDistributedTxMapping.java   |  17 ++
 .../distributed/dht/GridDhtLockFuture.java      |  10 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  10 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   8 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   3 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  12 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   8 +-
 .../GridDhtPartitionsExchangeFuture.java        |  16 +-
 .../distributed/near/GridNearLockRequest.java   |   1 +
 .../near/GridNearOptimisticTxPrepareFuture.java |  54 +++--
 .../GridNearPessimisticTxPrepareFuture.java     |   5 +-
 .../cache/distributed/near/GridNearTxLocal.java |  16 ++
 .../near/GridNearTxPrepareRequest.java          |  72 +++++--
 .../near/GridNearTxPrepareResponse.java         |  68 +++++--
 .../cache/transactions/IgniteInternalTx.java    |   5 +
 .../cache/transactions/IgniteTxAdapter.java     |  15 +-
 .../cache/transactions/IgniteTxHandler.java     | 117 ++++++++---
 ...niteCacheClientNodeChangingTopologyTest.java | 200 ++++++++++++++++++-
 20 files changed, 532 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 02f16c0..b8a4c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -488,7 +488,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     req.miniId(),
                     req.version(),
                     req.version(),
-                    null, null, null);
+                    null,
+                    null,
+                    null,
+                    false);
 
                 res.error(req.classError());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 25e18db..41a13ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -706,9 +706,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         GridDhtPartitionsExchangeFuture old = exchFuts.addx(
             fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs));
 
-        if (old != null)
+        if (old != null) {
             fut = old;
 
+            if (reqs != null)
+                fut.cacheChangeRequests(reqs);
+        }
+
         if (discoEvt != null)
             fut.onEvent(exchId, discoEvt);
 
@@ -870,17 +874,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
             else {
                 if (msg.client()) {
-                    IgniteInternalFuture<?> fut = affinityReadyFuture(msg.exchangeId().topologyVersion());
-
-                    if (fut != null) {
-                        fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                            @Override public void apply(IgniteInternalFuture<?> fut) {
-                                processSinglePartitionClientUpdate(node, msg);
-                            }
-                        });
-                    }
-                    else
-                        processSinglePartitionClientUpdate(node, msg);
+                    final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+                        null,
+                        null);
+
+                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                            // Finished future should reply only to sender client node.
+                            exchFut.onReceive(node.id(), msg);
+                        }
+                    });
                 }
                 else
                     exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
@@ -892,23 +895,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node.
-     * @param msg Message.
-     */
-    private void processSinglePartitionClientUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
-        final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
-            null,
-            null);
-
-        exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                // Finished future should reply only to sender client node.
-                exchFut.onReceive(node.id(), msg);
-            }
-        });
-    }
-
-    /**
      * @param node Node ID.
      * @param msg Message.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index fded3c9..bd1dedf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -63,6 +63,9 @@ public class GridDistributedTxMapping implements Externalizable {
     /** {@code True} if mapping is for near caches, {@code false} otherwise. */
     private boolean near;
 
+    /** {@code True} if this is first mapping for optimistic tx on client node. */
+    private boolean clientFirst;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -108,6 +111,20 @@ public class GridDistributedTxMapping implements Externalizable {
     }
 
     /**
+     * @return {@code True} if this is first mapping for optimistic tx on client node.
+     */
+    public boolean clientFirst() {
+        return clientFirst;
+    }
+
+    /**
+     * @param clientFirst {@code True} if this is first mapping for optimistic tx on client node.
+     */
+    public void clientFirst(boolean clientFirst) {
+        this.clientFirst = clientFirst;
+    }
+
+    /**
      * @return {@code True} if mapping is for near caches, {@code false} otherwise.
      */
     public boolean near() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c57eded..bdaa552 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 /**
  * Cache lock future.
  */
-public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion {
     /** */
     private static final long serialVersionUID = 0L;
@@ -60,7 +60,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
 
     /** Cache registry. */
     @GridToStringExclude
-    private GridCacheContext<K, V> cctx;
+    private GridCacheContext<?, ?> cctx;
 
     /** Near node ID. */
     private UUID nearNodeId;
@@ -151,7 +151,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
      * @param skipStore Skip store flag.
      */
     public GridDhtLockFuture(
-        GridCacheContext<K, V> cctx,
+        GridCacheContext<?, ?> cctx,
         UUID nearNodeId,
         GridCacheVersion nearLockVer,
         @NotNull AffinityTopologyVersion topVer,
@@ -221,7 +221,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
      * @param cacheCtx Cache context.
      * @param invalidPart Partition to retry.
      */
-    void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) {
+    void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) {
         invalidParts.add(invalidPart);
 
         // Register invalid partitions with transaction.
@@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
          * @param entries Entries to check.
          */
         @SuppressWarnings({"ForLoopReplaceableByForEach"})
-        private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
+        private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
             @Nullable List<GridDhtCacheEntry> entries) {
             if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty())
                 return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 26eef50..bda0868 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             return;
         }
 
-        // Group lock can be only started from local node, so we never start group lock transaction on remote node.
         IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
 
         // Register listener just so we print out errors.
@@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) {
         assert nodeId != null;
         assert res != null;
-        GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
-            res.futureId());
+        GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId());
 
         if (fut == null) {
             if (log.isDebugEnabled())
@@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
         assert tx != null;
 
-        GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(
+        GridDhtLockFuture fut = new GridDhtLockFuture(
             ctx,
             tx.nearNodeId(),
             tx.nearXidVersion(),
@@ -719,10 +717,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                         if (filter == null)
                             filter = req.filter();
 
-                        GridDhtLockFuture<K, V> fut = null;
+                        GridDhtLockFuture fut = null;
 
                         if (!req.inTx()) {
-                            fut = new GridDhtLockFuture<>(ctx,
+                            fut = new GridDhtLockFuture(ctx,
                                 nearNode.id(),
                                 req.version(),
                                 req.topologyVersion(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 54b59b8..90edb0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     private static final long serialVersionUID = 0L;
 
     /** Near mappings. */
-    protected Map<UUID, GridDistributedTxMapping> nearMap =
-        new ConcurrentHashMap8<>();
+    protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>();
 
     /** DHT mappings. */
-    protected Map<UUID, GridDistributedTxMapping> dhtMap =
-        new ConcurrentHashMap8<>();
+    protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
 
     /** Mapped flag. */
-    private AtomicBoolean mapped = new AtomicBoolean();
+    protected AtomicBoolean mapped = new AtomicBoolean();
 
     /** */
     private long dhtThreadId;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 293cf95..aed00ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             tx.writeVersion(),
             tx.invalidPartitions(),
             ret,
-            prepErr);
+            prepErr,
+            false);
 
         if (prepErr == null) {
             addDhtValues(res);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 05b3c7b..221b230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable TransactionIsolation isolation,
         long accessTtl
     ) {
-        assert tx == null || tx instanceof GridNearTxLocal;
+        assert tx == null || tx instanceof GridNearTxLocal : tx;
 
         GridNearTxLocal txx = (GridNearTxLocal)tx;
 
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-        GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
+        GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx,
             keys,
             txx,
             isRead,
@@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @return Lock future.
      */
     IgniteInternalFuture<Exception> lockAllAsync(
-        final GridCacheContext<K, V> cacheCtx,
+        final GridCacheContext<?, ?> cacheCtx,
         @Nullable final GridNearTxLocal tx,
         final long threadId,
         final GridCacheVersion ver,
@@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @return Lock future.
      */
     private IgniteInternalFuture<Exception> lockAllAsync0(
-        GridCacheContext<K, V> cacheCtx,
+        GridCacheContext<?, ?> cacheCtx,
         @Nullable final GridNearTxLocal tx,
         long threadId,
         final GridCacheVersion ver,
@@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         int cnt = keys.size();
 
         if (tx == null) {
-            GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+            GridDhtLockFuture fut = new GridDhtLockFuture(ctx,
                 ctx.localNodeId(),
                 ver,
                 topVer,
@@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         assert nodeId != null;
         assert res != null;
 
-        GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc().
+        GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc().
             <Boolean>future(res.version(), res.futureId());
 
         if (fut != null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 372c517..9d19152 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*;
 /**
  * Colocated cache lock future.
  */
-public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
     implements GridCacheFuture<Boolean> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
     /** Cache registry. */
     @GridToStringExclude
-    private GridCacheContext<K, V> cctx;
+    private GridCacheContext<?, ?> cctx;
 
     /** Lock owner thread. */
     @GridToStringInclude
@@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @param timeout Lock acquisition timeout.
      * @param accessTtl TTL for read operation.
      * @param filter Filter.
-     * @param skipStore
+     * @param skipStore Skip store flag.
      */
     public GridDhtColocatedLockFuture(
-        GridCacheContext<K, V> cctx,
+        GridCacheContext<?, ?> cctx,
         Collection<KeyCacheObject> keys,
         @Nullable GridNearTxLocal tx,
         boolean read,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 94ca540..af7fa5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -229,15 +229,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         initFut = new GridFutureAdapter<>();
 
-        // Grab all nodes with order of equal or less than last joined node.
-        ClusterNode node = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
-
-        oldestNode.set(node);
-
         if (log.isDebugEnabled())
             log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
     }
 
+    /**
+     * @param reqs Cache change requests.
+     */
+    public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
+        this.reqs = reqs;
+    }
+
     /** {@inheritDoc} */
     @Override public GridDiscoveryTopologySnapshot topologySnapshot() throws IgniteCheckedException {
         get();
@@ -461,6 +463,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 assert discoEvt != null : this;
                 assert !dummy && !forcePreload : this;
 
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+
+                oldestNode.set(oldest);
+
                 startCaches();
 
                 // True if client node joined or failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index e71dd65..35ef2e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -98,6 +98,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
      * @param implicitTx Flag to indicate that transaction is implicit.
      * @param implicitSingleTx Implicit-transaction-with-one-key flag.
      * @param isRead Indicates whether implicit lock is for read or write operation.
+     * @param retVal Return value flag.
      * @param isolation Transaction isolation.
      * @param isInvalidate Invalidation flag.
      * @param timeout Lock timeout.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 4f74303..713b713 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -221,18 +221,18 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         if (topVer != null) {
             tx.topologyVersion(topVer);
 
-            prepare0();
+            prepare0(false);
 
             return;
         }
 
-        prepareOnTopology();
+        prepareOnTopology(false);
     }
 
     /**
-     *
+     * @param remap Remap flag.
      */
-    private void prepareOnTopology() {
+    private void prepareOnTopology(final boolean remap) {
         GridDhtTopologyFuture topFut = topologyReadLock();
 
         try {
@@ -265,16 +265,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                     return;
                 }
 
-                tx.topologyVersion(topFut.topologyVersion());
+                if (remap)
+                    tx.onRemap(topFut.topologyVersion());
+                else
+                    tx.topologyVersion(topFut.topologyVersion());
 
-                prepare0();
+                prepare0(remap);
             }
             else {
                 topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                         cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
                             @Override public void run() {
-                                prepareOnTopology();
+                                prepareOnTopology(remap);
                             }
                         });
                     }
@@ -346,10 +349,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
     /**
      * Initializes future.
+     *
+     * @param remap Remap flag.
      */
-    private void prepare0() {
+    private void prepare0(boolean remap) {
         try {
-            if (!tx.state(PREPARING)) {
+            boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
+
+            if (!txStateCheck) {
                 if (tx.setRollbackOnly()) {
                     if (tx.timedOut())
                         onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
@@ -366,7 +373,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
             }
 
             // Make sure to add future before calling prepare.
-            cctx.mvcc().addFuture(this);
+            if (!remap)
+                cctx.mvcc().addFuture(this);
 
             prepare(
                 tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
@@ -502,7 +510,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
             tx.implicitSingle(),
             m.explicitLock(),
             tx.subjectId(),
-            tx.taskNameHash());
+            tx.taskNameHash(),
+            m.clientFirst());
 
         for (IgniteTxEntry txEntry : m.writes()) {
             if (txEntry.op() == TRANSFORM)
@@ -560,13 +569,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
      * @param entry Transaction entry.
      * @param topVer Topology version.
      * @param cur Current mapping.
+     * @param waitLock Wait lock flag.
      * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
      * @return Mapping.
      */
     private GridDistributedTxMapping map(
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
-        GridDistributedTxMapping cur,
+        @Nullable GridDistributedTxMapping cur,
         boolean waitLock
     ) throws IgniteCheckedException {
         GridCacheContext cacheCtx = entry.context();
@@ -599,10 +609,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         }
 
         if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+            boolean clientFirst = cur == null && cctx.kernalContext().clientNode();
+
             cur = new GridDistributedTxMapping(primary);
 
             // Initialize near flag right away.
             cur.near(cacheCtx.isNear());
+
+            cur.clientFirst(clientFirst);
         }
 
         cur.add(entry);
@@ -748,11 +762,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                     onError(nodeId, mappings, res.error());
                 }
                 else {
-                    onPrepareResponse(m, res);
+                    if (res.clientRemap()) {
+                        assert cctx.kernalContext().clientNode();
+                        assert m.clientFirst();
+
+                        prepareOnTopology(true);
+                    }
+                    else {
+                        onPrepareResponse(m, res);
 
-                    // Proceed prepare before finishing mini future.
-                    if (mappings != null)
-                        proceedPrepare(mappings);
+                        // Proceed prepare before finishing mini future.
+                        if (mappings != null)
+                            proceedPrepare(mappings);
+                    }
 
                     // Finish this mini future.
                     onDone(tx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index bce62c1..1d06860 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -84,6 +84,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
+            assert !res.clientRemap() : res;
+
             for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
                 MiniFuture f = (MiniFuture)fut;
 
@@ -187,7 +189,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 tx.implicitSingle(),
                 m.explicitLock(),
                 tx.subjectId(),
-                tx.taskNameHash());
+                tx.taskNameHash(),
+                false);
 
             for (IgniteTxEntry txEntry : m.writes()) {
                 if (txEntry.op() == TRANSFORM)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c38965d..e7664c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1185,6 +1185,22 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public void onRemap(AffinityTopologyVersion topVer) {
+        assert cctx.kernalContext().clientNode();
+
+        mapped.set(false);
+        nearLocallyMapped = false;
+        colocatedLocallyMapped = false;
+        txNodes = null;
+        onePhaseCommit = false;
+        nearMap.clear();
+        dhtMap.clear();
+        mappings.clear();
+
+        this.topVer.set(topVer);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index a08637d..b602a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -75,6 +75,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** Task name hash. */
     private int taskNameHash;
 
+    /** {@code True} if first optimistic tx prepare request sent from client node. */
+    private boolean firstClientReq;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -92,8 +95,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
      * @param txNodes Transaction nodes mapping.
      * @param last {@code True} if this last prepare request for node.
      * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
+     * @param onePhaseCommit One phase commit flag.
+     * @param retVal Return value flag.
+     * @param implicitSingle Implicit single flag.
+     * @param explicitLock Explicit lock flag.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash.
+     * @param firstClientReq {@code True} if first optimistic tx prepare request sent from client node.
      */
     public GridNearTxPrepareRequest(
         IgniteUuid futId,
@@ -110,11 +118,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         boolean implicitSingle,
         boolean explicitLock,
         @Nullable UUID subjId,
-        int taskNameHash
+        int taskNameHash,
+        boolean firstClientReq
     ) {
         super(tx, reads, writes, txNodes, onePhaseCommit);
 
         assert futId != null;
+        assert !firstClientReq || tx.optimistic() : tx;
 
         this.futId = futId;
         this.topVer = topVer;
@@ -126,6 +136,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         this.explicitLock = explicitLock;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+        this.firstClientReq = firstClientReq;
+    }
+
+    /**
+     * @return {@code True} if first optimistic tx prepare request sent from client node.
+     */
+    public boolean firstClientRequest() {
+        return firstClientReq;
     }
 
     /**
@@ -273,60 +291,66 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeBoolean("firstClientReq", firstClientReq))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeBoolean("implicitSingle", implicitSingle))
+                if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeBoolean("last", last))
+                if (!writer.writeBoolean("implicitSingle", implicitSingle))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
+                if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeBoolean("near", near))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeBoolean("retVal", retVal))
+                if (!writer.writeBoolean("near", near))
                     return false;
 
                 writer.incrementState();
 
             case 31:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("retVal", retVal))
                     return false;
 
                 writer.incrementState();
 
             case 32:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 33:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 34:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -357,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 24:
-                futId = reader.readIgniteUuid("futId");
+                firstClientReq = reader.readBoolean("firstClientReq");
 
                 if (!reader.isLastRead())
                     return false;
@@ -365,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 25:
-                implicitSingle = reader.readBoolean("implicitSingle");
+                futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -373,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 26:
-                last = reader.readBoolean("last");
+                implicitSingle = reader.readBoolean("implicitSingle");
 
                 if (!reader.isLastRead())
                     return false;
@@ -381,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 27:
-                lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
+                last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
                     return false;
@@ -389,7 +413,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 28:
-                miniId = reader.readIgniteUuid("miniId");
+                lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
                     return false;
@@ -397,7 +421,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 29:
-                near = reader.readBoolean("near");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -405,7 +429,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 30:
-                retVal = reader.readBoolean("retVal");
+                near = reader.readBoolean("near");
 
                 if (!reader.isLastRead())
                     return false;
@@ -413,7 +437,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 31:
-                subjId = reader.readUuid("subjId");
+                retVal = reader.readBoolean("retVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -421,7 +445,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 32:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -429,6 +453,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 33:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 34:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -448,7 +480,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 34;
+        return 35;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index f8c07f7..e4dfd4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -83,6 +83,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     @GridDirectCollection(IgniteTxKey.class)
     private Collection<IgniteTxKey> filterFailedKeys;
 
+    /** */
+    private boolean clientRemap;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -95,9 +98,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
      * @param futId Future ID.
      * @param miniId Mini future ID.
      * @param dhtVer DHT version.
+     * @param writeVer Write version.
      * @param invalidParts Invalid partitions.
      * @param retVal Return value.
      * @param err Error.
+     * @param clientRemap {@code True} if client node should remap transaction.
      */
     public GridNearTxPrepareResponse(
         GridCacheVersion xid,
@@ -107,7 +112,8 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         GridCacheVersion writeVer,
         Collection<Integer> invalidParts,
         GridCacheReturn retVal,
-        Throwable err
+        Throwable err,
+        boolean clientRemap
     ) {
         super(xid, err);
 
@@ -121,6 +127,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         this.writeVer = writeVer;
         this.invalidParts = invalidParts;
         this.retVal = retVal;
+        this.clientRemap = clientRemap;
+    }
+
+    /**
+     * @return {@code True} if client node should remap transaction.
+     */
+    public boolean clientRemap() {
+        return clientRemap;
     }
 
     /**
@@ -330,60 +344,66 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
         switch (writer.state()) {
             case 10:
-                if (!writer.writeMessage("dhtVer", dhtVer))
+                if (!writer.writeBoolean("clientRemap", clientRemap))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("dhtVer", dhtVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
+                if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeMessage("retVal", retVal))
+                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 19:
+                if (!writer.writeMessage("retVal", retVal))
+                    return false;
+
+                writer.incrementState();
+
+            case 20:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -406,7 +426,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
         switch (reader.state()) {
             case 10:
-                dhtVer = reader.readMessage("dhtVer");
+                clientRemap = reader.readBoolean("clientRemap");
 
                 if (!reader.isLastRead())
                     return false;
@@ -414,7 +434,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 11:
-                filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
+                dhtVer = reader.readMessage("dhtVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -422,7 +442,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 12:
-                futId = reader.readIgniteUuid("futId");
+                filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -430,7 +450,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 13:
-                invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
+                futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -438,7 +458,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 14:
-                miniId = reader.readIgniteUuid("miniId");
+                invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -446,7 +466,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 15:
-                ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -454,7 +474,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 16:
-                ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
+                ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -462,7 +482,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 17:
-                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
+                ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -470,7 +490,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 18:
-                retVal = reader.readMessage("retVal");
+                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -478,6 +498,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 19:
+                retVal = reader.readMessage("retVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -497,7 +525,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 20;
+        return 21;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 5f877ec..cb86e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -706,4 +706,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
      * @return Public API proxy.
      */
     public TransactionProxy proxy();
+
+    /**
+     * @param topVer New topology version.
+     */
+    public void onRemap(AffinityTopologyVersion topVer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb8825e..8cb9cc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -184,7 +184,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>();
 
     /** Topology version. */
-    private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
+    protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
 
     /** Mutex. */
     private final Lock lock = new ReentrantLock();
@@ -493,13 +493,17 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public void onRemap(AffinityTopologyVersion topVer) {
+        assert false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean hasTransforms() {
         return transform;
     }
 
     /** {@inheritDoc} */
-    @Override
-    public boolean markPreparing() {
+    @Override public boolean markPreparing() {
         return preparing.compareAndSet(false, true);
     }
 
@@ -1716,6 +1720,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
+        @Override public void onRemap(AffinityTopologyVersion topVer) {
+            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean empty() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index f466bf2..f815a73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -229,14 +229,22 @@ public class IgniteTxHandler {
             return null;
         }
 
+        IgniteTxEntry firstEntry = null;
+
         try {
-            for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes()))
+            for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
                 e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+
+                if (firstEntry == null)
+                    firstEntry = e;
+            }
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture<>(e);
         }
 
+        assert firstEntry != null : req;
+
         GridDhtTxLocal tx;
 
         GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
@@ -253,36 +261,87 @@ public class IgniteTxHandler {
             }
         }
         else {
-            tx = new GridDhtTxLocal(
-                ctx,
-                nearNode.id(),
-                req.version(),
-                req.futureId(),
-                req.miniId(),
-                req.threadId(),
-                req.implicitSingle(),
-                req.implicitSingle(),
-                req.system(),
-                req.explicitLock(),
-                req.policy(),
-                req.concurrency(),
-                req.isolation(),
-                req.timeout(),
-                req.isInvalidate(),
-                false,
-                req.txSize(),
-                req.transactionNodes(),
-                req.subjectId(),
-                req.taskNameHash()
-            );
+            GridDhtPartitionTopology top = null;
 
-            tx = ctx.tm().onCreated(null, tx);
+            if (req.firstClientRequest()) {
+                assert req.concurrency().equals(OPTIMISTIC) : req;
 
-            if (tx != null)
-                tx.topologyVersion(req.topologyVersion());
-            else
-                U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
-                    req.version() + ", req=" + req + ']');
+                top = firstEntry.context().topology();
+
+                top.readLock();
+            }
+
+            try {
+                if (top != null && !top.topologyVersion().equals(req.topologyVersion())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Client topology version mismatch, need remap transaction [" +
+                            "reqTopVer=" + req.topologyVersion() +
+                            ", locTopVer=" + top.topologyVersion() +
+                            ", req=" + req + ']');
+                    }
+
+                    GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+                        req.version(),
+                        req.futureId(),
+                        req.miniId(),
+                        req.version(),
+                        req.version(),
+                        null,
+                        null,
+                        null,
+                        true);
+
+                    try {
+                        ctx.io().send(nearNode, res, req.policy());
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send client tx remap response, client node failed " +
+                                "[node=" + nearNode + ", req=" + req + ']');
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send client tx remap response " +
+                            "[node=" + nearNode + ", req=" + req + ']', e);
+                    }
+
+                    return new GridFinishedFuture<>(res);
+                }
+
+                tx = new GridDhtTxLocal(
+                    ctx,
+                    nearNode.id(),
+                    req.version(),
+                    req.futureId(),
+                    req.miniId(),
+                    req.threadId(),
+                    req.implicitSingle(),
+                    req.implicitSingle(),
+                    req.system(),
+                    req.explicitLock(),
+                    req.policy(),
+                    req.concurrency(),
+                    req.isolation(),
+                    req.timeout(),
+                    req.isInvalidate(),
+                    false,
+                    req.txSize(),
+                    req.transactionNodes(),
+                    req.subjectId(),
+                    req.taskNameHash()
+                );
+
+                tx = ctx.tm().onCreated(null, tx);
+
+                if (tx != null)
+                    tx.topologyVersion(req.topologyVersion());
+                else
+                    U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
+                        req.version() + ", req=" + req + ']');
+            }
+            finally {
+                if (top != null)
+                    top.readUnlock();
+            }
         }
 
         if (tx != null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index f964d39..3b80c2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
 
 import javax.cache.*;
 import java.util.*;
@@ -228,7 +229,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         final Map<Integer, Integer> map = new HashMap<>();
 
-        for (int i = 0; i < 1; i++)
+        for (int i = 0; i < 100; i++)
             map.put(i, i);
 
         TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
@@ -252,7 +253,76 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         IgniteEx ignite3 = startGrid(3);
 
-        log.info("Stop block1.");
+        log.info("Stop block.");
+
+        spi.stopBlock();
+
+        putFut.get();
+
+        checkData(map, 4);
+
+        map.clear();
+
+        for (int i = 0; i < 100; i++)
+            map.put(i, i + 1);
+
+        cache.putAll(map);
+
+        checkData(map, 4);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testPessimisticTx() throws Exception {
+        ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(0);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setRebalanceMode(SYNC);
+
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+
+        client = true;
+
+        final Ignite ignite2 = startGrid(2);
+
+        assertTrue(ignite2.configuration().isClientMode());
+
+        final Map<Integer, Integer> map = new HashMap<>();
+
+        for (int i = 0; i < 1; i++)
+            map.put(i, i);
+
+        TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
+
+        spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id());
+        spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id());
+
+        final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
+
+        IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.putAll(map);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        });
+
+        assertFalse(putFut.isDone());
+
+        client = false;
+
+        IgniteEx ignite3 = startGrid(3);
+
+        log.info("Stop block.");
 
         spi.stopBlock();
 
@@ -264,6 +334,99 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
     /**
      * @throws Exception If failed.
      */
+    public void testOptimisticTxMessageClientFirstFlag() throws Exception {
+        ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setRebalanceMode(SYNC);
+
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+        IgniteEx ignite2 = startGrid(2);
+
+        client = true;
+
+        Ignite ignite3 = startGrid(3);
+
+        assertTrue(ignite3.configuration().isClientMode());
+
+        TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
+
+        IgniteCache<Integer, Integer> cache = ignite3.cache(null);
+
+        List<Integer> keys0 = primaryKeys(ignite0.cache(null), 2, 0);
+        List<Integer> keys1 = primaryKeys(ignite1.cache(null), 2, 0);
+        List<Integer> keys2 = primaryKeys(ignite2.cache(null), 2, 0);
+
+        LinkedHashMap<Integer, Integer> map = new LinkedHashMap<>();
+
+        map.put(keys0.get(0), 1);
+        map.put(keys1.get(0), 2);
+        map.put(keys2.get(0), 3);
+        map.put(keys0.get(1), 4);
+        map.put(keys1.get(1), 5);
+        map.put(keys2.get(1), 6);
+
+        spi.record(GridNearTxPrepareRequest.class);
+
+        try (Transaction tx = ignite3.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
+            for (Map.Entry<Integer, Integer> e : map.entrySet())
+                cache.put(e.getKey(), e.getValue());
+
+            tx.commit();
+        }
+
+        checkClientPrepareMessages(spi.recordedMessages(), 6);
+
+        checkData(map, 4);
+
+        cache.putAll(map);
+
+        checkClientPrepareMessages(spi.recordedMessages(), 6);
+
+        spi.record(null);
+
+        checkData(map, 4);
+
+        IgniteCache<Integer, Integer> cache0 = ignite0.cache(null);
+
+        TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+        spi0.record(GridNearTxPrepareRequest.class);
+
+        cache0.putAll(map);
+
+        spi0.record(null);
+
+        List<Object> msgs = spi0.recordedMessages();
+
+        assertEquals(4, msgs.size());
+
+        for (Object msg : msgs)
+            assertFalse(((GridNearTxPrepareRequest)msg).firstClientRequest());
+
+        checkData(map, 4);
+    }
+
+    /**
+     * @param msgs Messages.
+     * @param expCnt Expected number of messages.
+     */
+    private void checkClientPrepareMessages(List<Object> msgs, int expCnt) {
+        assertEquals(expCnt, msgs.size());
+
+        assertTrue(((GridNearTxPrepareRequest)msgs.get(0)).firstClientRequest());
+
+        for (int i = 1; i < msgs.size(); i++)
+            assertFalse(((GridNearTxPrepareRequest) msgs.get(i)).firstClientRequest());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testLockRemoveAfterClientFailed() throws Exception {
         ccfg = new CacheConfiguration();
 
@@ -376,7 +539,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
     /**
      * @throws Exception If failed.
      */
-    public void testPessimisticTxPutAllMultinode() throws Exception {
+    public void _testPessimisticTxPutAllMultinode() throws Exception {
         putAllMultinode(null, true);
     }
 
@@ -579,12 +742,21 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
         /** */
         private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
 
+        /** */
+        private Class<?> recordCls;
+
+        /** */
+        private List<Object> recordedMsgs = new ArrayList<>();
+
         /** {@inheritDoc} */
         @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
             if (msg instanceof GridIoMessage) {
                 Object msg0 = ((GridIoMessage)msg).message();
 
                 synchronized (this) {
+                    if (recordCls != null && msg0.getClass().equals(recordCls))
+                        recordedMsgs.add(msg0);
+
                     Set<UUID> blockNodes = blockCls.get(msg0.getClass());
 
                     if (F.contains(blockNodes, node.id())) {
@@ -602,6 +774,28 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
         }
 
         /**
+         * @param recordCls Message class to record.
+         */
+        void record(@Nullable Class<?> recordCls) {
+            synchronized (this) {
+                this.recordCls = recordCls;
+            }
+        }
+
+        /**
+         * @return Recorded messages.
+         */
+        List<Object> recordedMessages() {
+            synchronized (this) {
+                List<Object> msgs = recordedMsgs;
+
+                recordedMsgs = new ArrayList<>();
+
+                return msgs;
+            }
+        }
+
+        /**
          * @param cls Message class.
          * @param nodeId Node ID.
          */