You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/03 03:03:22 UTC

[13/50] [abbrv] ignite git commit: Merge branch master into ignite-264

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --cc modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index de9d112,0540148..3fa39cc
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@@ -141,12 -142,8 +142,12 @@@ public class MessageCodeGenerator 
  
          MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);
  
-         gen.generateAndWrite(DataStreamerEntry.class);
+ //        gen.generateAndWrite(DataStreamerEntry.class);
  
 +//        gen.generateAndWrite(GridDistributedUnlockRequest.class);
 +//        gen.generateAndWrite(GridNearUnlockRequest.class);
 +//        gen.generateAndWrite(GridDhtUnlockRequest.class);
 +//
  //        gen.generateAndWrite(GridDistributedLockRequest.class);
  //        gen.generateAndWrite(GridDistributedLockResponse.class);
  //        gen.generateAndWrite(GridNearLockRequest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 0000000,663ed90..2978c77
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@@ -1,0 -1,506 +1,530 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.processors.cache.*;
++import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.util.*;
+ import java.util.concurrent.atomic.*;
+ 
+ /**
+  * Future verifying that all remote transactions related to transaction were prepared or committed.
+  */
+ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> {
+     /** */         
+     private static final long serialVersionUID = 0L;
+     
+     /** Logger reference. */
+     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ 
+     /** Logger. */
+     private static IgniteLogger log;
+ 
+     /** Trackable flag. */
+     private boolean trackable = true;
+ 
+     /** Context. */
+     private final GridCacheSharedContext<?, ?> cctx;
+ 
+     /** Future ID. */
+     private final IgniteUuid futId = IgniteUuid.randomUuid();
+ 
+     /** Transaction. */
+     private final IgniteInternalTx tx;
+ 
+     /** All involved nodes. */
+     private final Map<UUID, ClusterNode> nodes;
+ 
+     /** ID of failed node started transaction. */
+     private final UUID failedNodeId;
+ 
+     /** Transaction nodes mapping. */
+     private final Map<UUID, Collection<UUID>> txNodes;
+ 
+     /** */
+     private final boolean nearTxCheck;
+ 
+     /**
+      * @param cctx Context.
+      * @param tx Transaction.
+      * @param failedNodeId ID of failed node started transaction.
+      * @param txNodes Transaction mapping.
+      */
+     @SuppressWarnings("ConstantConditions")
+     public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx,
+         IgniteInternalTx tx,
+          UUID failedNodeId,
+         Map<UUID, Collection<UUID>> txNodes)
+     {
+         super(cctx.kernalContext(), CU.boolReducer());
+ 
+         this.cctx = cctx;
+         this.tx = tx;
+         this.txNodes = txNodes;
+         this.failedNodeId = failedNodeId;
+ 
+         if (log == null)
+             log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class);
+ 
+         nodes = new GridLeanMap<>();
+ 
+         UUID locNodeId = cctx.localNodeId();
+ 
+         for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
+             if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
+                 ClusterNode node = cctx.discovery().node(e.getKey());
+ 
+                 if (node != null)
+                     nodes.put(node.id(), node);
+                 else if (log.isDebugEnabled())
+                     log.debug("Transaction node left (will ignore) " + e.getKey());
+             }
+ 
+             for (UUID nodeId : e.getValue()) {
+                 if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
+                     ClusterNode node = cctx.discovery().node(nodeId);
+ 
+                     if (node != null)
+                         nodes.put(node.id(), node);
+                     else if (log.isDebugEnabled())
+                         log.debug("Transaction node left (will ignore) " + e.getKey());
+                 }
+             }
+         }
+ 
+         UUID nearNodeId = tx.eventNodeId();
+ 
+         nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
+     }
+ 
+     /**
+      * Initializes future.
+      */
+     @SuppressWarnings("ConstantConditions")
+     public void prepare() {
+         if (nearTxCheck) {
+             UUID nearNodeId = tx.eventNodeId();
+ 
+             if (cctx.localNodeId().equals(nearNodeId)) {
+                 IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+ 
+                 fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                     @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                         try {
+                             onDone(fut.get());
+                         }
+                         catch (IgniteCheckedException e) {
+                             onDone(e);
+                         }
+                     }
+                 });
+             }
+             else {
+                 MiniFuture fut = new MiniFuture(tx.eventNodeId());
+ 
+                 add(fut);
+ 
+                 GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+                     tx,
+                     0,
+                     true,
+                     futureId(),
+                     fut.futureId());
+ 
+                 try {
+                     cctx.io().send(nearNodeId, req, tx.ioPolicy());
+                 }
+                 catch (ClusterTopologyCheckedException e) {
+                     fut.onNodeLeft();
+                 }
+                 catch (IgniteCheckedException e) {
+                     fut.onError(e);
+                 }
+ 
+                 markInitialized();
+             }
+ 
+             return;
+         }
+ 
+         // First check transactions on local node.
+         int locTxNum = nodeTransactions(cctx.localNodeId());
+ 
+         if (locTxNum > 1) {
+             IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
+ 
+             if (fut == null || fut.isDone()) {
+                 boolean prepared;
+ 
+                 try {
+                     prepared = fut == null ? true : fut.get();
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Check prepared transaction future failed: " + e, e);
+ 
+                     prepared = false;
+                 }
+ 
+                 if (!prepared) {
+                     onDone(false);
+ 
+                     markInitialized();
+ 
+                     return;
+                 }
+             }
+             else {
+                 fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                     @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                         boolean prepared;
+ 
+                         try {
+                             prepared = fut.get();
+                         }
+                         catch (IgniteCheckedException e) {
+                             U.error(log, "Check prepared transaction future failed: " + e, e);
+ 
+                             prepared = false;
+                         }
+ 
+                         if (!prepared) {
+                             onDone(false);
+ 
+                             markInitialized();
+                         }
+                         else
+                             proceedPrepare();
+                     }
+                 });
+ 
+                 return;
+             }
+         }
+ 
+         proceedPrepare();
+     }
+ 
+     /**
+      * Process prepare after local check.
+      */
+     private void proceedPrepare() {
+         for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
+             UUID nodeId = entry.getKey();
+ 
+             // Skip left nodes and local node.
+             if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
+                 continue;
+ 
+             /*
+              * If primary node failed then send message to all backups, otherwise
+              * send message only to primary node.
+              */
+ 
+             if (nodeId.equals(failedNodeId)) {
+                 for (UUID id : entry.getValue()) {
+                     // Skip backup node if it is local node or if it is also was mapped as primary.
+                     if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
+                         continue;
+ 
+                     MiniFuture fut = new MiniFuture(id);
+ 
+                     add(fut);
+ 
+                     GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(tx,
+                         nodeTransactions(id),
+                         false,
+                         futureId(),
+                         fut.futureId());
+ 
+                     try {
+                         cctx.io().send(id, req, tx.ioPolicy());
+                     }
+                     catch (ClusterTopologyCheckedException ignored) {
+                         fut.onNodeLeft();
+                     }
+                     catch (IgniteCheckedException e) {
+                         fut.onError(e);
+ 
+                         break;
+                     }
+                 }
+             }
+             else {
+                 MiniFuture fut = new MiniFuture(nodeId);
+ 
+                 add(fut);
+ 
+                 GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+                     tx,
+                     nodeTransactions(nodeId),
+                     false,
+                     futureId(),
+                     fut.futureId());
+ 
+                 try {
+                     cctx.io().send(nodeId, req, tx.ioPolicy());
+                 }
+                 catch (ClusterTopologyCheckedException ignored) {
+                     fut.onNodeLeft();
+                 }
+                 catch (IgniteCheckedException e) {
+                     fut.onError(e);
+ 
+                     break;
+                 }
+             }
+         }
+ 
++        // Specifically check originating near node.
++        if (tx instanceof GridDhtTxRemote) {
++            UUID nearNodeId = ((GridDhtTxRemote)tx).nearNodeId();
++
++            if (cctx.localNodeId().equals(nearNodeId))
++                add(cctx.tm().nearTxCommitted(tx.nearXidVersion()));
++            else {
++                MiniFuture fut = new MiniFuture(nearNodeId);
++
++                add(fut);
++
++                GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new GridCacheOptimisticCheckPreparedTxRequest<>(
++                    tx, 1, futureId(), fut.futureId(), true);
++
++                try {
++                    cctx.io().send(nearNodeId, req, tx.ioPolicy());
++                }
++                catch (ClusterTopologyCheckedException ignored) {
++                    fut.onNodeLeft();
++                }
++                catch (IgniteCheckedException e) {
++                    fut.onError(e);
++                }
++            }
++        }
++
+         markInitialized();
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @return Number of transactions on node.
+      */
+     private int nodeTransactions(UUID nodeId) {
+         int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
+ 
+         for (Collection<UUID> backups : txNodes.values()) {
+             for (UUID backup : backups) {
+                 if (backup.equals(nodeId)) {
+                     cnt++; // +1 if node is backup.
+ 
+                     break;
+                 }
+             }
+         }
+ 
+         return cnt;
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param res Response.
+      */
+     public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) {
+         if (!isDone()) {
+             for (IgniteInternalFuture<Boolean> fut : pending()) {
+                 if (isMini(fut)) {
+                     MiniFuture f = (MiniFuture)fut;
+ 
+                     if (f.futureId().equals(res.miniId())) {
+                         assert f.nodeId().equals(nodeId);
+ 
+                         f.onResult(res);
+ 
+                         break;
+                     }
+                 }
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteUuid futureId() {
+         return futId;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheVersion version() {
+         return tx.xidVersion();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<? extends ClusterNode> nodes() {
+         return nodes.values();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onNodeLeft(UUID nodeId) {
+         for (IgniteInternalFuture<?> fut : futures())
+             if (isMini(fut)) {
+                 MiniFuture f = (MiniFuture)fut;
+ 
 -                if (f.nodeId().equals(nodeId)) {
++                if (f.nodeId().equals(nodeId))
+                     f.onNodeLeft();
 -
 -                    return true;
 -                }
+             }
+ 
 -        return false;
++        return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean trackable() {
+         return trackable;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void markNotTrackable() {
+         trackable = false;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+         if (super.onDone(res, err)) {
+             cctx.mvcc().removeFuture(this);
+ 
+             if (err == null) {
+                 assert res != null;
+ 
+                 cctx.tm().finishTxOnRecovery(tx, res);
+             }
+             else {
+                 if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
+                     if (log.isDebugEnabled())
+                         log.debug("Failed to check transaction on near node, " +
+                             "ignoring [err=" + err + ", tx=" + tx + ']');
+                 }
+                 else {
+                     if (log.isDebugEnabled())
+                         log.debug("Failed to check prepared transactions, " +
+                             "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+ 
+                     cctx.tm().salvageTx(tx);
+                 }
+             }
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * @param f Future.
+      * @return {@code True} if mini-future.
+      */
+     private boolean isMini(IgniteInternalFuture<?> f) {
+         return f.getClass().equals(MiniFuture.class);
+     }
+ 
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridCacheTxRecoveryFuture.class, this, "super", super.toString());
+     }
+ 
+     /**
+      *
+      */
+     private class MiniFuture extends GridFutureAdapter<Boolean> {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Mini future ID. */
+         private final IgniteUuid futId = IgniteUuid.randomUuid();
+ 
+         /** Node ID. */
+         private UUID nodeId;
+ 
+         /**
+          * @param nodeId Node ID.
+          */
+         private MiniFuture(UUID nodeId) {
+             this.nodeId = nodeId;
+         }
+ 
+         /**
+          * @return Node ID.
+          */
+         private UUID nodeId() {
+             return nodeId;
+         }
+ 
+         /**
+          * @return Future ID.
+          */
+         private IgniteUuid futureId() {
+             return futId;
+         }
+ 
+         /**
+          * @param e Error.
+          */
+         private void onError(Throwable e) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+ 
+             onDone(e);
+         }
+ 
+         /**
+          */
+         private void onNodeLeft() {
+             if (log.isDebugEnabled())
+                 log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
+ 
+             if (nearTxCheck) {
+                 // Near and originating nodes left, need initiate tx check.
+                 cctx.tm().commitIfPrepared(tx);
+ 
+                 onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+             }
+             else
+                 onDone(true);
+         }
+ 
+         /**
+          * @param res Result callback.
+          */
+         private void onResult(GridCacheTxRecoveryResponse res) {
+             onDone(res.success());
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
index 0000000,2f49ef4..15d2141
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
@@@ -1,0 -1,261 +1,268 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed;
+ 
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.plugin.extensions.communication.*;
+ 
+ import java.io.*;
+ import java.nio.*;
+ 
+ /**
+  * Message sent to check that transactions related to transaction were prepared on remote node.
+  */
+ public class GridCacheTxRecoveryRequest extends GridDistributedBaseMessage {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** Future ID. */
+     private IgniteUuid futId;
+ 
+     /** Mini future ID. */
+     private IgniteUuid miniId;
+ 
+     /** Near transaction ID. */
+     private GridCacheVersion nearXidVer;
+ 
+     /** Expected number of transactions on node. */
+     private int txNum;
+ 
+     /** System transaction flag. */
+     private boolean sys;
+ 
+     /** {@code True} if should check only tx on near node. */
+     private boolean nearTxCheck;
+ 
+     /**
+      * Empty constructor required by {@link Externalizable}
+      */
+     public GridCacheTxRecoveryRequest() {
+         // No-op.
+     }
+ 
+     /**
+      * @param tx Transaction.
+      * @param txNum Expected number of transactions on remote node.
+      * @param nearTxCheck {@code True} if should check only tx on near node.
+      * @param futId Future ID.
+      * @param miniId Mini future ID.
+      */
+     public GridCacheTxRecoveryRequest(IgniteInternalTx tx,
+         int txNum,
+         boolean nearTxCheck,
+         IgniteUuid futId,
+         IgniteUuid miniId)
+     {
+         super(tx.xidVersion(), 0);
+ 
+         nearXidVer = tx.nearXidVersion();
+         sys = tx.system();
+ 
+         this.futId = futId;
+         this.miniId = miniId;
+         this.txNum = txNum;
+         this.nearTxCheck = nearTxCheck;
+     }
+ 
+     /**
+      * @return {@code True} if should check only tx on near node.
+      */
+     public boolean nearTxCheck() {
+         return nearTxCheck;
+     }
+ 
+     /**
+      * @return Near version.
+      */
+     public GridCacheVersion nearXidVersion() {
+         return nearXidVer;
+     }
+ 
+     /**
+      * @return Future ID.
+      */
+     public IgniteUuid futureId() {
+         return futId;
+     }
+ 
+     /**
+      * @return Mini future ID.
+      */
+     public IgniteUuid miniId() {
+         return miniId;
+     }
+ 
+     /**
+      * @return Expected number of transactions on node.
+      */
+     public int transactions() {
+         return txNum;
+     }
+ 
+     /**
+      * @return System transaction flag.
+      */
+     public boolean system() {
+         return sys;
+     }
+ 
++    /**
++     * @return Near check flag.
++     */
++    public boolean nearCheck() {
++        return nearCheck;
++    }
++
+     /** {@inheritDoc} */
+     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+         writer.setBuffer(buf);
+ 
+         if (!super.writeTo(buf, writer))
+             return false;
+ 
+         if (!writer.isHeaderWritten()) {
+             if (!writer.writeHeader(directType(), fieldsCount()))
+                 return false;
+ 
+             writer.onHeaderWritten();
+         }
+ 
+         switch (writer.state()) {
+             case 7:
+                 if (!writer.writeIgniteUuid("futId", futId))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 8:
+                 if (!writer.writeIgniteUuid("miniId", miniId))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 9:
+                 if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 10:
+                 if (!writer.writeMessage("nearXidVer", nearXidVer))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 11:
+                 if (!writer.writeBoolean("sys", sys))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 12:
+                 if (!writer.writeInt("txNum", txNum))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+         }
+ 
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+         reader.setBuffer(buf);
+ 
+         if (!reader.beforeMessageRead())
+             return false;
+ 
+         if (!super.readFrom(buf, reader))
+             return false;
+ 
+         switch (reader.state()) {
+             case 7:
+                 futId = reader.readIgniteUuid("futId");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 8:
+                 miniId = reader.readIgniteUuid("miniId");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 9:
+                 nearTxCheck = reader.readBoolean("nearTxCheck");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 10:
+                 nearXidVer = reader.readMessage("nearXidVer");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 11:
+                 sys = reader.readBoolean("sys");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 12:
+                 txNum = reader.readInt("txNum");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+         }
+ 
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public byte directType() {
+         return 16;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public byte fieldsCount() {
 -        return 13;
++        return 12;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridCacheTxRecoveryRequest.class, this, "super", super.toString());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index 63c28e9,7b84f32..f759fcc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@@ -55,15 -54,16 +54,6 @@@ public abstract class GridDistributedBa
      @GridToStringExclude
      private byte[] candsByIdxBytes;
  
-     /** Collections of local lock candidates. */
 -    /** Committed versions with order higher than one for this message (needed for commit ordering). */
--    @GridToStringInclude
-     @GridDirectTransient
-     private Map<KeyCacheObject, Collection<GridCacheMvccCandidate>> candsByKey;
 -    @GridDirectCollection(GridCacheVersion.class)
 -    private Collection<GridCacheVersion> committedVers;
--
-     /** Collections of local lock candidates in serialized form. */
-     @GridToStringExclude
-     private byte[] candsByKeyBytes;
 -    /** Rolled back versions with order higher than one for this message (needed for commit ordering). */
 -    @GridToStringInclude
 -    @GridDirectCollection(GridCacheVersion.class)
 -    private Collection<GridCacheVersion> rolledbackVers;
--
      /** Count of keys referenced in candidates array (needed only locally for optimization). */
      @GridToStringInclude
      @GridDirectTransient
@@@ -228,13 -207,19 +173,7 @@@
  
                  writer.incrementState();
  
--            case 4:
-                 if (!writer.writeByteArray("candsByKeyBytes", candsByKeyBytes))
 -                if (!writer.writeCollection("committedVers", committedVers, MessageCollectionItemType.MSG))
--                    return false;
--
--                writer.incrementState();
--
--            case 5:
 -                if (!writer.writeCollection("rolledbackVers", rolledbackVers, MessageCollectionItemType.MSG))
 -                    return false;
 -
 -                writer.incrementState();
 -
+             case 6:
                  if (!writer.writeMessage("ver", ver))
                      return false;
  
@@@ -264,15 -249,23 +203,7 @@@
  
                  reader.incrementState();
  
--            case 4:
-                 candsByKeyBytes = reader.readByteArray("candsByKeyBytes");
 -                committedVers = reader.readCollection("committedVers", MessageCollectionItemType.MSG);
--
--                if (!reader.isLastRead())
--                    return false;
--
--                reader.incrementState();
--
--            case 5:
 -                rolledbackVers = reader.readCollection("rolledbackVers", MessageCollectionItemType.MSG);
 -
 -                if (!reader.isLastRead())
 -                    return false;
 -
 -                reader.incrementState();
 -
+             case 6:
                  ver = reader.readMessage("ver");
  
                  if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 18ae318,7a84f9a..5d79f0a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@@ -94,9 -98,12 +91,9 @@@ public class GridDistributedTxFinishReq
          boolean commit,
          boolean invalidate,
          boolean sys,
-         GridIoPolicy plc,
+         byte plc,
          boolean syncCommit,
          boolean syncRollback,
 -        GridCacheVersion baseVer,
 -        Collection<GridCacheVersion> committedVers,
 -        Collection<GridCacheVersion> rolledbackVers,
          int txSize
      ) {
          super(xidVer, 0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 1af96e9,e160529..5080e3a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@@ -747,9 -756,16 +754,9 @@@ public final class GridDhtLockFuture ex
                  try {
                      while (true) {
                          try {
 -                            hasRmtNodes = cctx.dhtMap(
 -                                nearNodeId,
 -                                topVer,
 -                                entry,
 -                                tx == null ? lockVer : null,
 -                                log,
 -                                dhtMap,
 -                                null);
 +                            cctx.dhtMap(nearNodeId, topVer, entry, log, dhtMap, null);
  
-                             GridCacheMvccCandidate cand = entry.mappings(lockVer);
+                             GridCacheMvccCandidate cand = entry.candidate(lockVer);
  
                              // Possible in case of lock cancellation.
                              if (cand == null) {
@@@ -850,22 -870,35 +858,33 @@@
  
                              boolean invalidateRdr = e.readerId(n.id()) != null;
  
-                             req.addDhtKey(
-                                 e.key(),
-                                 invalidateRdr,
-                                 cctx);
+                             req.addDhtKey(e.key(), invalidateRdr, cctx);
  
-                             if (needVal)
+                             if (needVal) {
                                  // Mark last added key as needed to be preloaded.
                                  req.markLastKeyForPreload();
+ 
+                                 if (tx != null) {
+                                     IgniteTxEntry txEntry = tx.entry(e.txKey());
+ 
+                                     // NOOP entries will be sent to backups on prepare step.
+                                     if (txEntry.op() == GridCacheOperation.READ)
+                                         txEntry.op(GridCacheOperation.NOOP);
+                                 }
+                             }
 -
 -                            it.set(addOwned(req, e));
                          }
  
-                         add(fut); // Append new future.
+                         if (!F.isEmpty(req.keys())) {
+                             if (tx != null)
+                                 tx.addLockTransactionNode(n);
  
-                         if (log.isDebugEnabled())
-                             log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']');
+                             add(fut); // Append new future.
  
-                         cctx.io().send(n, req, cctx.ioPolicy());
+                             if (log.isDebugEnabled())
+                                 log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']');
+ 
+                             cctx.io().send(n, req, cctx.ioPolicy());
+                         }
                      }
                      catch (IgniteCheckedException e) {
                          // Fail the whole thing.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index fa679ef,4f081bf..da14b9f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@@ -946,9 -1064,22 +1050,15 @@@ public abstract class GridDhtTransactio
          try {
              // Send reply back to originating near node.
              GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(),
-                 req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
+                 req.version(),
+                 req.futureId(),
+                 req.miniId(),
+                 tx != null && tx.onePhaseCommit(),
+                 entries.size(),
+                 err,
+                 null);
  
              if (err == null) {
 -                res.pending(localDhtPendingVersions(entries, mappedVer));
 -
 -                // We have to add completed versions for cases when nearLocal and remote transactions
 -                // execute concurrently.
 -                res.completedVersions(ctx.tm().committedVersions(req.version()),
 -                    ctx.tm().rolledbackVersions(req.version()));
 -
                  int i = 0;
  
                  for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext();) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index dc3eeac,9aa9c17..87fe6e7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@@ -313,8 -379,12 +379,8 @@@ public final class GridDhtTxFinishFutur
                  tx.system(),
                  tx.ioPolicy(),
                  tx.isSystemInvalidate(),
-                 tx.syncCommit(),
-                 tx.syncRollback(),
+                 sync,
+                 sync,
 -                tx.completedBase(),
 -                tx.committedVersions(),
 -                tx.rolledbackVersions(),
 -                tx.pendingVersions(),
                  tx.size(),
                  tx.subjectId(),
                  tx.taskNameHash());
@@@ -362,8 -430,12 +428,8 @@@
                      tx.system(),
                      tx.ioPolicy(),
                      tx.isSystemInvalidate(),
-                     tx.syncCommit(),
-                     tx.syncRollback(),
+                     sync,
+                     sync,
 -                    tx.completedBase(),
 -                    tx.committedVersions(),
 -                    tx.rolledbackVersions(),
 -                    tx.pendingVersions(),
                      tx.size(),
                      tx.subjectId(),
                      tx.taskNameHash());

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 1a6883b,fe72b24..52fb87a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@@ -19,11 -19,8 +19,10 @@@ package org.apache.ignite.internal.proc
  
  import org.apache.ignite.internal.*;
  import org.apache.ignite.internal.processors.affinity.*;
- import org.apache.ignite.internal.managers.communication.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
  import org.apache.ignite.internal.processors.cache.version.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.transactions.*;
  import org.apache.ignite.internal.util.tostring.*;
  import org.apache.ignite.internal.util.typedef.internal.*;
  import org.apache.ignite.lang.*;
@@@ -336,7 -335,15 +335,7 @@@ public class GridDhtTxFinishRequest ext
  
                  reader.incrementState();
  
-             case 24:
 -            case 21:
 -                pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
 -
 -                if (!reader.isLastRead())
 -                    return false;
 -
 -                reader.incrementState();
 -
+             case 22:
                  subjId = reader.readUuid("subjId");
  
                  if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 1d8f2b8,b50a010..2b0e7c2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@@ -64,6 -62,18 +62,12 @@@ public abstract class GridDhtTxLocalAda
      /** */
      private long dhtThreadId;
  
+     /** */
+     protected boolean explicitLock;
+ 
 -    /** */
 -    private boolean needsCompletedVers;
 -
 -    /** Versions of pending locks for entries of this tx. */
 -    private Collection<GridCacheVersion> pendingVers;
 -
+     /** Nodes where transactions were started on lock step. */
+     private Set<ClusterNode> lockTxNodes;
+ 
      /**
       * Empty constructor required for {@link Externalizable}.
       */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 1b097b6,fbc8c84..1cc3175
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -292,8 -291,12 +291,12 @@@ public final class GridDhtTxPrepareFutu
  
                  boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
  
 -                if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
 +                if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
-                     cached.unswap(true, retVal);
+                     cached.unswap(retVal);
+ 
+                     boolean readThrough = (retVal || hasFilters) &&
+                         cacheCtx.config().isLoadPreviousValue() &&
+                         !txEntry.skipStore();
  
                      CacheObject val = cached.innerGet(
                          tx,
@@@ -937,15 -909,15 +947,17 @@@
  
                          for (IgniteTxEntry entry : nearMapping.writes()) {
                              try {
-                                 GridCacheMvccCandidate added = entry.cached().candidate(version());
+                                 if (entry.explicitVersion() == null) {
+                                     GridCacheMvccCandidate added = entry.cached().candidate(version());
  
 -                                    assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
 -                                        "[added=" + added + ", entry=" + entry + ']';
 +                                assert added != null : "Null candidate for non-group-lock entry " +
 +                                    "[added=" + added + ", entry=" + entry + ']';
 +                                assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
 +                                    "[added=" + added + ", entry=" + entry + ']';
  
-                                 if (added != null && added.ownerVersion() != null)
-                                     req.owned(entry.txKey(), added.ownerVersion());
+                                     if (added != null && added.ownerVersion() != null)
+                                         req.owned(entry.txKey(), added.ownerVersion());
+                                 }
  
                                  break;
                              }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 49c1fb6,221b230..8202d10
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -581,8 -581,20 +578,18 @@@ public class GridDhtColocatedCache<K, V
                  GridDistributedUnlockRequest req = mapping.getValue();
  
                  if (!F.isEmpty(req.keys())) {
-                     // We don't wait for reply to this message.
-                     ctx.io().send(n, req, ctx.ioPolicy());
 -                    req.completedVersions(committed, rolledback);
 -
+                     try {
+                         // We don't wait for reply to this message.
+                         ctx.io().send(n, req, ctx.ioPolicy());
+                     }
+                     catch (ClusterTopologyCheckedException e) {
+                         if (log.isDebugEnabled())
+                             log.debug("Failed to send unlock request (node has left the grid) [keys=" + req.keys() +
+                                 ", n=" + n + ", e=" + e + ']');
+                     }
+                     catch (IgniteCheckedException e) {
+                         U.error(log, "Failed to send unlock request [keys=" + req.keys() + ", n=" + n + ']', e);
+                     }
                  }
              }
          }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index ce614cb,3d28018..d264de1
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@@ -1015,9 -1074,10 +1074,9 @@@ public final class GridNearLockFuture e
  
                                          // Lock is held at this point, so we can set the
                                          // returned value if any.
-                                         entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
+                                         entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer.get());
  
 -                                        entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
 -                                            res.rolledbackVersions(), res.pending());
 +                                        entry.readyNearLock(lockVer, mappedVer);
  
                                          if (inTx() && implicitTx() && tx.onePhaseCommit()) {
                                              boolean pass = res.filterResult(i);
@@@ -1317,111 -1381,132 +1380,128 @@@
                      return;
                  }
  
-                 int i = 0;
+                 if (res.clientRemapVersion() != null) {
+                     assert cctx.kernalContext().clientNode();
  
-                 AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
+                     IgniteInternalFuture<?> affFut =
+                         cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
  
-                 for (KeyCacheObject k : keys) {
-                     while (true) {
-                         GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
+                     if (affFut != null && !affFut.isDone()) {
+                         affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                             @Override public void apply(IgniteInternalFuture<?> fut) {
+                                 remap();
+                             }
+                         });
+                     }
+                     else
+                         remap();
+                 }
+                 else {
+                     int i = 0;
  
-                         try {
-                             if (res.dhtVersion(i) == null) {
-                                 onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
-                                     "(will fail the lock): " + res));
+                     AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
  
-                                 return;
-                             }
+                     for (KeyCacheObject k : keys) {
+                         while (true) {
+                             GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
  
-                             IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
+                             try {
+                                 if (res.dhtVersion(i) == null) {
+                                     onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+                                         "(will fail the lock): " + res));
  
-                             CacheObject oldVal = entry.rawGet();
-                             boolean hasOldVal = false;
-                             CacheObject newVal = res.value(i);
+                                     return;
+                                 }
  
-                             boolean readRecordable = false;
+                                 IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
  
-                             if (retval) {
-                                 readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+                                 CacheObject oldVal = entry.rawGet();
+                                 boolean hasOldVal = false;
+                                 CacheObject newVal = res.value(i);
  
-                                 if (readRecordable)
-                                     hasOldVal = entry.hasValue();
-                             }
+                                 boolean readRecordable = false;
+ 
+                                 if (retval) {
+                                     readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+ 
+                                     if (readRecordable)
+                                         hasOldVal = entry.hasValue();
+                                 }
  
-                             GridCacheVersion dhtVer = res.dhtVersion(i);
-                             GridCacheVersion mappedVer = res.mappedVersion(i);
+                                 GridCacheVersion dhtVer = res.dhtVersion(i);
+                                 GridCacheVersion mappedVer = res.mappedVersion(i);
  
-                             if (newVal == null) {
-                                 if (oldValTup != null) {
-                                     if (oldValTup.get1().equals(dhtVer))
-                                         newVal = oldValTup.get2();
+                                 if (newVal == null) {
+                                     if (oldValTup != null) {
+                                         if (oldValTup.get1().equals(dhtVer))
+                                             newVal = oldValTup.get2();
  
-                                     oldVal = oldValTup.get2();
+                                         oldVal = oldValTup.get2();
+                                     }
                                  }
-                             }
  
-                             // Lock is held at this point, so we can set the
-                             // returned value if any.
-                             entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
+                                 // Lock is held at this point, so we can set the
+                                 // returned value if any.
+                                 entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
  
-                             if (inTx() && implicitTx() && tx.onePhaseCommit()) {
-                                 boolean pass = res.filterResult(i);
+                                 if (inTx()) {
+                                     tx.hasRemoteLocks(true);
  
-                                 tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
-                             }
+                                     if (implicitTx() && tx.onePhaseCommit()) {
+                                         boolean pass = res.filterResult(i);
  
-                             entry.readyNearLock(lockVer, mappedVer);
- 
-                             if (retval) {
-                                 if (readRecordable)
-                                     cctx.events().addEvent(
-                                         entry.partition(),
-                                         entry.key(),
-                                         tx,
-                                         null,
-                                         EVT_CACHE_OBJECT_READ,
-                                         newVal,
-                                         newVal != null,
-                                         oldVal,
-                                         hasOldVal,
-                                         CU.subjectId(tx, cctx.shared()),
-                                         null,
-                                         inTx() ? tx.resolveTaskName() : null);
- 
-                                 if (cctx.cache().configuration().isStatisticsEnabled())
-                                     cctx.cache().metrics0().onRead(false);
-                             }
+                                         tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
+                                     }
+                                 }
 -
 -                                entry.readyNearLock(lockVer,
 -                                    mappedVer,
 -                                    res.committedVersions(),
 -                                    res.rolledbackVersions(),
 -                                    res.pending());
++                                
++                                entry.readyNearLock(lockVer, mappedVer);
+ 
+                                 if (retval) {
+                                     if (readRecordable)
+                                         cctx.events().addEvent(
+                                             entry.partition(),
+                                             entry.key(),
+                                             tx,
+                                             null,
+                                             EVT_CACHE_OBJECT_READ,
+                                             newVal,
+                                             newVal != null,
+                                             oldVal,
+                                             hasOldVal,
+                                             CU.subjectId(tx, cctx.shared()),
+                                             null,
+                                             inTx() ? tx.resolveTaskName() : null);
+ 
+                                     if (cctx.cache().configuration().isStatisticsEnabled())
+                                         cctx.cache().metrics0().onRead(false);
+                                 }
  
-                             if (log.isDebugEnabled())
-                                 log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                                 if (log.isDebugEnabled())
+                                     log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
  
-                             break; // Inner while loop.
-                         }
-                         catch (GridCacheEntryRemovedException ignored) {
-                             if (log.isDebugEnabled())
-                                 log.debug("Failed to add candidates because entry was removed (will renew).");
+                                 break; // Inner while loop.
+                             }
+                             catch (GridCacheEntryRemovedException ignored) {
+                                 if (log.isDebugEnabled())
+                                     log.debug("Failed to add candidates because entry was removed (will renew).");
  
-                             // Replace old entry with new one.
-                             entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+                                 // Replace old entry with new one.
+                                 entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+                             }
                          }
-                         catch (IgniteCheckedException e) {
-                             onDone(e);
  
-                             return;
-                         }
+                         i++;
                      }
  
-                     i++;
-                 }
+                     try {
+                         proceedMapping(mappings);
+                     }
+                     catch (IgniteCheckedException e) {
+                         onDone(e);
+                     }
  
-                 try {
-                     proceedMapping(mappings);
-                 }
-                 catch (IgniteCheckedException e) {
-                     onDone(e);
+                     onDone(true);
                  }
- 
-                 onDone(true);
              }
          }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 1e68d5e,4776963..081c7fc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@@ -347,7 -381,13 +364,13 @@@ public class GridNearLockRequest extend
  
                  writer.incrementState();
  
 -            case 23:
 +            case 21:
+                 if (!writer.writeBoolean("firstClientReq", firstClientReq))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 24:
                  if (!writer.writeBoolean("hasTransforms", hasTransforms))
                      return false;
  
@@@ -371,7 -411,13 +394,7 @@@
  
                  writer.incrementState();
  
-             case 30:
 -            case 28:
 -                if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
 -                    return false;
 -
 -                writer.incrementState();
 -
+             case 29:
                  if (!writer.writeBoolean("retVal", retVal))
                      return false;
  
@@@ -473,7 -527,23 +504,15 @@@
  
                  reader.incrementState();
  
-             case 25:
 -            case 28:
 -                onePhaseCommit = reader.readBoolean("onePhaseCommit");
 -
 -                if (!reader.isLastRead())
 -                    return false;
 -
 -                reader.incrementState();
 -
+             case 29:
+                 retVal = reader.readBoolean("retVal");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 30:
                  subjId = reader.readUuid("subjId");
  
                  if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
index 1fefa4c,62002dc..3b4f7e5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
@@@ -18,6 -18,8 +18,7 @@@
  package org.apache.ignite.internal.processors.cache.distributed.near;
  
  import org.apache.ignite.*;
 -import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.affinity.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
  import org.apache.ignite.internal.processors.cache.version.*;
@@@ -90,6 -104,31 +97,13 @@@ public class GridNearLockResponse exten
      }
  
      /**
+      * @return {@code True} if client node should remap lock request.
+      */
+     @Nullable public AffinityTopologyVersion clientRemapVersion() {
+         return clientRemapVer;
+     }
+ 
+     /**
 -     * Gets pending versions that are less than {@link #version()}.
 -     *
 -     * @return Pending versions.
 -     */
 -    public Collection<GridCacheVersion> pending() {
 -        return pending;
 -    }
 -
 -    /**
 -     * Sets pending versions that are less than {@link #version()}.
 -     *
 -     * @param pending Pending versions.
 -     */
 -    public void pending(Collection<GridCacheVersion> pending) {
 -        this.pending = pending;
 -    }
 -
 -    /**
       * @return Mini future ID.
       */
      public IgniteUuid miniId() {