You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/24 14:12:35 UTC

[45/50] [abbrv] ignite git commit: Merge branch master ignite-2.0 to ignite-3477

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ef8150c,c20ed48..bc1c584
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -38,7 -38,7 +38,8 @@@ import org.apache.ignite.cluster.Cluste
  import org.apache.ignite.internal.IgniteInternalFuture;
  import org.apache.ignite.internal.NodeStoppingException;
  import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 +import org.apache.ignite.internal.pagemem.wal.StorageException;
+ import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
  import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
@@@ -2699,24 -2600,12 +2618,15 @@@ public class GridDhtAtomicCache<K, V> e
              GridCacheOperation op;
  
              if (putMap != null) {
-                 // If fast mapping, filter primary keys for write to store.
-                 Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
-                     F.view(putMap, new P1<CacheObject>() {
-                         @Override public boolean apply(CacheObject key) {
-                             return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
-                         }
-                     }) :
-                     putMap;
- 
                  try {
-                     Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(storeMap,
 -                    ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
 -                        @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
 -                            return F.t(v, ver);
 -                        }
 -                    }));
++                    Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(putMap,
 +                        new C1<CacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>() {
 +                            @Override public IgniteBiTuple<? extends CacheObject, GridCacheVersion> apply(CacheObject val) {
 +                                return F.t(val, ver);
 +                            }
 +                        });
 +
 +                    ctx.store().putAll(null, view);
                  }
                  catch (CacheStorePartialUpdateException e) {
                      storeErr = e;
@@@ -3212,11 -3028,10 +3054,10 @@@
       * @param nodeId Sender node ID.
       * @param res Near atomic update response.
       */
 -    @SuppressWarnings("unchecked")
      private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
          if (msgLog.isDebugEnabled())
-             msgLog.debug("Received near atomic update response " +
-                 "[futId=" + res.futureVersion() +
 -            msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
++            msgLog.debug("Received near atomic update response [futId" + res.futureId() +
 +                ", node=" + nodeId + ']');
  
          res.nodeId(ctx.localNodeId());
  
@@@ -3254,126 -3100,209 +3126,235 @@@
  
          String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
  
 -        for (int i = 0; i < req.size(); i++) {
 -            KeyCacheObject key = req.key(i);
 +        ctx.shared().database().checkpointReadLock();
  
 -            try {
 -                while (true) {
 -                    GridDhtCacheEntry entry = null;
 +        try {
 +            for (int i = 0; i < req.size(); i++) {
 +                KeyCacheObject key = req.key(i);
  
 -                    try {
 -                        entry = entryExx(key);
 +                try {
 +                    while (true) {
 +                        GridDhtCacheEntry entry = null;
  
 -                        CacheObject val = req.value(i);
 -                        CacheObject prevVal = req.previousValue(i);
 +                        try {
 +                            entry = entryExx(key);
  
 -                        EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
 -                        Long updateIdx = req.updateCounter(i);
 +                            CacheObject val = req.value(i);
 +                            CacheObject prevVal = req.previousValue(i);
  
 -                        GridCacheOperation op = entryProcessor != null ? TRANSFORM :
 -                            (val != null) ? UPDATE : DELETE;
 +                            EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
 +                            Long updateIdx = req.updateCounter(i);
  
 -                        long ttl = req.ttl(i);
 -                        long expireTime = req.conflictExpireTime(i);
 +                            GridCacheOperation op = entryProcessor != null ? TRANSFORM :
 +                                (val != null) ? UPDATE : DELETE;
  
 -                        GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
 -                            ver,
 -                            nodeId,
 -                            nodeId,
 -                            op,
 -                            op == TRANSFORM ? entryProcessor : val,
 -                            op == TRANSFORM ? req.invokeArguments() : null,
 -                            /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly())
 -                                && writeThrough() && !req.skipStore(),
 -                            /*read-through*/false,
 -                            /*retval*/false,
 -                            req.keepBinary(),
 -                            /*expiry policy*/null,
 -                            /*event*/true,
 -                            /*metrics*/true,
 -                            /*primary*/false,
 -                            /*check version*/!req.forceTransformBackups(),
 -                            req.topologyVersion(),
 -                            CU.empty0(),
 -                            replicate ? DR_BACKUP : DR_NONE,
 -                            ttl,
 -                            expireTime,
 -                            req.conflictVersion(i),
 -                            false,
 -                            intercept,
 -                            req.subjectId(),
 -                            taskName,
 -                            prevVal,
 -                            updateIdx,
 -                            null);
 +                            long ttl = req.ttl(i);
 +                            long expireTime = req.conflictExpireTime(i);
 +
 +                            GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
 +                                ver,
 +                                nodeId,
 +                                nodeId,
 +                                op,
 +                                op == TRANSFORM ? entryProcessor : val,
 +                                op == TRANSFORM ? req.invokeArguments() : null,
 +                                /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly())
 +                                    && writeThrough() && !req.skipStore(),
 +                                /*read-through*/false,
 +                                /*retval*/false,
 +                                req.keepBinary(),
 +                                /*expiry policy*/null,
 +                                /*event*/true,
 +                                /*metrics*/true,
 +                                /*primary*/false,
 +                                /*check version*/!req.forceTransformBackups(),
 +                                req.topologyVersion(),
 +                                CU.empty0(),
 +                                replicate ? DR_BACKUP : DR_NONE,
 +                                ttl,
 +                                expireTime,
 +                                req.conflictVersion(i),
 +                                false,
 +                                intercept,
 +                                req.subjectId(),
 +                                taskName,
 +                                prevVal,
 +                                updateIdx,
 +                                null);
  
 -                        if (updRes.removeVersion() != null)
 -                            ctx.onDeferredDelete(entry, updRes.removeVersion());
 +                            if (updRes.removeVersion() != null)
 +                                ctx.onDeferredDelete(entry, updRes.removeVersion());
  
 -                        entry.onUnlock();
 +                            entry.onUnlock();
  
 -                        break; // While.
 -                    }
 -                    catch (GridCacheEntryRemovedException ignored) {
 -                        if (log.isDebugEnabled())
 -                            log.debug("Got removed entry while updating backup value (will retry): " + key);
 +                            break; // While.
 +                        }
 +                        catch (GridCacheEntryRemovedException ignored) {
 +                            if (log.isDebugEnabled())
 +                                log.debug("Got removed entry while updating backup value (will retry): " + key);
  
-                             entry = null;
-                         }
-                         finally {
-                             if (entry != null)
-                                 ctx.evicts().touch(entry, req.topologyVersion());
-                         }
+                         entry = null;
+                     }
+                     finally {
+                         if (entry != null)
+                             ctx.evicts().touch(entry, req.topologyVersion());
                      }
                  }
-                 catch (GridDhtInvalidPartitionException ignored) {
-                     // Ignore.
-                 }
-                 catch (IgniteCheckedException e) {
-                     res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
-                 }
+             }
+             catch (GridDhtInvalidPartitionException ignored) {
+                 // Ignore.
+             }
+             catch (IgniteCheckedException e) {
 -                IgniteCheckedException err =
 -                    new IgniteCheckedException("Failed to update key on backup node: " + key, e);
++                IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+ 
+                 if (nearRes != null)
+                     nearRes.addFailedKey(key, err);
+ 
 -                U.error(log, "Failed to update key on backup node: " + key, e);
++                U.error(log, "Failed to update key on backup node: " + key, e);}
              }
          }
 +        finally {
 +            ctx.shared().database().checkpointReadUnlock();
 +        }
  
-         if (isNearEnabled(cacheCfg))
-             ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res);
+         GridDhtAtomicUpdateResponse dhtRes = null;
+ 
+         if (isNearEnabled(cacheCfg)) {
+             List<KeyCacheObject> nearEvicted =
+                 ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
+ 
+             if (nearEvicted != null) {
+                 dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+                     req.partition(),
+                     req.futureId(),
+                     ctx.deploymentEnabled());
+ 
+                 dhtRes.nearEvicted(nearEvicted);
+             }
+         }
  
 +        try {
 +            // TODO handle failure: probably drop the node from topology
 +            // TODO fire events only after successful fsync
 +            if (ctx.shared().wal() != null)
 +                ctx.shared().wal().fsync(null);
 +        }
 +        catch (StorageException e) {
-             res.onError(new IgniteCheckedException(e));
++            if (dhtRes != null)
++                dhtRes.onError(new IgniteCheckedException(e));
++
++            if (nearRes != null)
++                nearRes.onClassError(e);
 +        }
 +        catch (IgniteCheckedException e) {
-             res.onError(e);
++            if (dhtRes != null)
++                dhtRes.onError(e);
++
++            if (nearRes != null)
++                nearRes.onClassError(e);
++        }
++
+         if (nearRes != null)
+             sendDhtNearResponse(req, nearRes);
+ 
+         if (dhtRes == null && req.replyWithoutDelay()) {
+             dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+                 req.partition(),
+                 req.futureId(),
+                 ctx.deploymentEnabled());
          }
  
+         if (dhtRes != null)
+             sendDhtPrimaryResponse(nodeId, req, dhtRes);
+         else
+             sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
+     }
+ 
+     /**
+      * @param nodeId Primary node ID.
+      * @param req Request.
+      * @param dhtRes Response to send.
+      */
+     private void sendDhtPrimaryResponse(UUID nodeId,
+         GridDhtAtomicAbstractUpdateRequest req,
+         GridDhtAtomicUpdateResponse dhtRes) {
          try {
-             if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) {
-                 ctx.io().send(nodeId, res, ctx.ioPolicy());
+             ctx.io().send(nodeId, dhtRes, ctx.ioPolicy());
  
-                 if (msgLog.isDebugEnabled()) {
-                     msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() +
-                         ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
-                 }
+             if (msgLog.isDebugEnabled()) {
+                 msgLog.debug("Sent DHT response [futId=" + req.futureId() +
+                     ", nearFutId=" + req.nearFutureId() +
+                     ", writeVer=" + req.writeVersion() +
+                     ", node=" + nodeId + ']');
              }
-             else {
-                 if (msgLog.isDebugEnabled()) {
-                     msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() +
-                         ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
-                 }
+         }
+         catch (ClusterTopologyCheckedException ignored) {
+             U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() +
+                 ", nearFutId=" + req.nearFutureId() +
+                 ", node=" + nodeId + ']');
+         }
+         catch (IgniteCheckedException e) {
+             U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+                 ", nearFutId=" + req.nearFutureId() +
+                 ", node=" + nodeId +
+                 ", res=" + dhtRes + ']', e);
+         }
+     }
+ 
+     /**
+      * @param part Partition.
+      * @param primaryId Primary ID.
+      * @param futId Future ID.
+      */
+     private void sendDeferredUpdateResponse(int part, UUID primaryId, long futId) {
+         Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+ 
+         GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+ 
+         if (msg == null) {
+             msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
+                 new GridLongList(DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE));
+ 
+             if (DEFERRED_UPDATE_RESPONSE_TIMEOUT > 0) {
+                 GridTimeoutObject timeoutSnd = new DeferredUpdateTimeout(part, primaryId);
+ 
+                 msg.timeoutSender(timeoutSnd);
+ 
+                 ctx.time().addTimeoutObject(timeoutSnd);
+             }
+ 
+             resMap.put(primaryId, msg);
+         }
+ 
+         GridLongList futIds = msg.futureIds();
+ 
+         assert futIds.size() < DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE : futIds.size();
+ 
+         futIds.add(futId);
  
-                 // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
-                 sendDeferredUpdateResponse(nodeId, req.futureVersion());
+         if (futIds.size() >= DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) {
+             resMap.remove(primaryId);
+ 
+             sendDeferredUpdateResponse(primaryId, msg);
+         }
+     }
+ 
+     /**
+      * @param primaryId Primary ID.
+      * @param msg Message.
+      */
+     private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) {
+         try {
+             GridTimeoutObject timeoutSnd = msg.timeoutSender();
+ 
+             if (timeoutSnd != null)
+                 ctx.time().removeTimeoutObject(timeoutSnd);
+ 
+             ctx.io().send(primaryId, msg, ctx.ioPolicy());
+ 
+             if (msgLog.isDebugEnabled()) {
+                 msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() +
+                     ", node=" + primaryId + ']');
              }
          }
          catch (ClusterTopologyCheckedException ignored) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 9160865,6811236..9887f55
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@@ -140,93 -73,25 +73,27 @@@ public abstract class GridNearAtomicAbs
          boolean retval,
          @Nullable UUID subjId,
          int taskNameHash,
+         boolean mappingKnown,
          boolean skipStore,
          boolean keepBinary,
 +        boolean recovery,
-         boolean clientReq,
          boolean addDepInfo
      ) {
-         assert futVer != null;
- 
-         this.cacheId = cacheId;
-         this.nodeId = nodeId;
-         this.futVer = futVer;
-         this.updateVer = updateVer;
-         this.topVer = topVer;
-         this.syncMode = syncMode;
-         this.op = op;
-         this.subjId = subjId;
-         this.taskNameHash = taskNameHash;
-         this.addDepInfo = addDepInfo;
- 
-         fastMap(fastMap);
-         topologyLocked(topLocked);
-         returnValue(retval);
-         skipStore(skipStore);
-         keepBinary(keepBinary);
-         clientRequest(clientReq);
-         recovery(recovery);
-     }
- 
-     /** {@inheritDoc} */
-     @Override public int lookupIndex() {
-         return CACHE_MSG_IDX;
-     }
- 
-     /**
-      * @return Mapped node ID.
-      */
-     @Override public UUID nodeId() {
-         return nodeId;
-     }
- 
-     /**
-      * @param nodeId Node ID.
-      */
-     @Override public void nodeId(UUID nodeId) {
-         this.nodeId = nodeId;
-     }
- 
-     /**
-      * @return Subject ID.
-      */
-     @Override public UUID subjectId() {
-         return subjId;
-     }
- 
-     /**
-      * @return Task name hash.
-      */
-     @Override public int taskNameHash() {
-         return taskNameHash;
-     }
- 
-     /**
-      * @return Future version.
-      */
-     @Override public GridCacheVersion futureVersion() {
-         return futVer;
-     }
- 
-     /**
-      * @return Update version for fast-map request.
-      */
-     @Override public GridCacheVersion updateVersion() {
-         return updateVer;
-     }
- 
-     /**
-      * @return Topology version.
-      */
-     @Override public AffinityTopologyVersion topologyVersion() {
-         return topVer;
-     }
- 
-     /**
-      * @return Cache write synchronization mode.
-      */
-     @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
-         return syncMode;
+         super(cacheId,
+             nodeId,
+             futId,
+             topVer,
+             topLocked,
+             syncMode,
+             op,
+             retval,
+             subjId,
+             taskNameHash,
+             mappingKnown,
+             skipStore,
+             keepBinary,
++            recovery,
+             addDepInfo);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index b933186,a43bfb0..4b3ea5bc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@@ -38,15 -48,177 +48,183 @@@ public abstract class GridNearAtomicAbs
      /** Message index. */
      public static final int CACHE_MSG_IDX = nextIndexId();
  
+     /** . */
+     private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
+ 
+     /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+     private static final int TOP_LOCKED_FLAG_MASK = 0x02;
+ 
+     /** Skip write-through to a persistent storage. */
+     private static final int SKIP_STORE_FLAG_MASK = 0x04;
+ 
+     /** Keep binary flag. */
+     private static final int KEEP_BINARY_FLAG_MASK = 0x08;
+ 
+     /** Return value flag. */
+     private static final int RET_VAL_FLAG_MASK = 0x10;
+ 
++    /** Recovery value flag. */
++    private static final int RECOVERY_FLAG_MASK = 0x20;
++
+     /** Target node ID. */
+     @GridDirectTransient
+     protected UUID nodeId;
+ 
+     /** Future version. */
+     protected long futId;
+ 
+     /** Topology version. */
+     protected AffinityTopologyVersion topVer;
+ 
+     /** Write synchronization mode. */
+     protected CacheWriteSynchronizationMode syncMode;
+ 
+     /** Update operation. */
+     protected GridCacheOperation op;
+ 
+     /** Subject ID. */
+     protected UUID subjId;
+ 
+     /** Task name hash. */
+     protected int taskNameHash;
+ 
+     /** Compressed boolean flags. Make sure 'toString' is updated when add new flag. */
+     @GridToStringExclude
+     protected byte flags;
+ 
+     /** */
+     @GridDirectTransient
+     private GridNearAtomicUpdateResponse res;
+ 
      /**
-      * @return Mapped node ID.
+      *
       */
-     public abstract UUID nodeId();
+     public GridNearAtomicAbstractUpdateRequest() {
+         // No-op.
+     }
  
      /**
+      * Constructor.
+      *
+      * @param cacheId Cache ID.
       * @param nodeId Node ID.
+      * @param futId Future ID.
+      * @param topVer Topology version.
+      * @param topLocked Topology locked flag.
+      * @param syncMode Synchronization mode.
+      * @param op Cache update operation.
+      * @param retval Return value required flag.
+      * @param subjId Subject ID.
+      * @param taskNameHash Task name hash code.
+      * @param needPrimaryRes {@code True} if near node waits for primary response.
+      * @param skipStore Skip write-through to a persistent storage.
+      * @param keepBinary Keep binary flag.
+      * @param addDepInfo Deployment info flag.
+      */
+     protected GridNearAtomicAbstractUpdateRequest(
+         int cacheId,
+         UUID nodeId,
+         long futId,
+         @NotNull AffinityTopologyVersion topVer,
+         boolean topLocked,
+         CacheWriteSynchronizationMode syncMode,
+         GridCacheOperation op,
+         boolean retval,
+         @Nullable UUID subjId,
+         int taskNameHash,
+         boolean needPrimaryRes,
+         boolean skipStore,
+         boolean keepBinary,
++        boolean recovery,
+         boolean addDepInfo
+     ) {
+         this.cacheId = cacheId;
+         this.nodeId = nodeId;
+         this.futId = futId;
+         this.topVer = topVer;
+         this.syncMode = syncMode;
+         this.op = op;
+         this.subjId = subjId;
+         this.taskNameHash = taskNameHash;
+         this.addDepInfo = addDepInfo;
+ 
+         if (needPrimaryRes)
+             needPrimaryResponse(true);
+         if (topLocked)
+             topologyLocked(true);
+         if (retval)
+             returnValue(true);
+         if (skipStore)
+             skipStore(true);
+         if (keepBinary)
+             keepBinary(true);
++        if (recovery)
++            recovery(true);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public final AffinityTopologyVersion topologyVersion() {
+         return topVer;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public final int lookupIndex() {
+         return CACHE_MSG_IDX;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public final boolean addDeploymentInfo() {
+         return addDepInfo;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+         return ctx.atomicMessageLogger();
+     }
+ 
+     /**
+      * @return {@code True} if near node is able to initialize update mapping locally.
+      */
+     boolean initMappingLocally() {
+         return !needPrimaryResponse() && fullSync();
+     }
+ 
+     /**
+      * @return {@code True} if near node waits for primary response.
+      */
+     boolean needPrimaryResponse() {
+         return isFlag(NEED_PRIMARY_RES_FLAG_MASK);
+     }
+ 
+     /**
+      * @param needRes {@code True} if near node waits for primary response.
+      */
+     void needPrimaryResponse(boolean needRes) {
+         setFlag(needRes, NEED_PRIMARY_RES_FLAG_MASK);
+     }
+ 
+     /**
+      * @return {@code True} if update is processed in {@link CacheWriteSynchronizationMode#FULL_SYNC} mode.
+      */
+     boolean fullSync() {
+         assert syncMode != null;
+ 
+         return syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
+     }
+ 
+     /**
+      * @return Task name hash code.
+      */
+     public int taskNameHash() {
+         return taskNameHash;
+     }
+ 
+     /**
+      * @return Update opreation.
       */
-     public abstract void nodeId(UUID nodeId);
+     public GridCacheOperation operation() {
+         return op;
+     }
  
      /**
       * @return Subject ID.
@@@ -111,38 -328,51 +334,65 @@@
      /**
       * @return Keep binary flag.
       */
-     public abstract boolean keepBinary();
+     public final boolean keepBinary() {
+         return isFlag(KEEP_BINARY_FLAG_MASK);
+     }
  
      /**
-      * @return Recovery flag.
+      * @param val Keep binary flag.
       */
-     public abstract boolean recovery();
+     public void keepBinary(boolean val) {
+         setFlag(val, KEEP_BINARY_FLAG_MASK);
+     }
  
      /**
-      * @return Update operation.
++     * @return Keep binary flag.
 +     */
-     public abstract GridCacheOperation operation();
++    public final boolean recovery() {
++        return isFlag(RECOVERY_FLAG_MASK);
++    }
 +
 +    /**
-      * @return Optional arguments for entry processor.
++     * @param val Keep binary flag.
 +     */
-     @Nullable public abstract Object[] invokeArguments();
++    public void recovery(boolean val) {
++        setFlag(val, RECOVERY_FLAG_MASK);
++    }
 +
 +    /**
-      * @return Flag indicating whether this request contains primary keys.
+      * Sets flag mask.
+      *
+      * @param flag Set or clear.
+      * @param mask Mask.
       */
-     public abstract boolean hasPrimary();
+     private void setFlag(boolean flag, int mask) {
+         flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+     }
  
      /**
-      * @param res Response.
-      * @return {@code True} if current response was {@code null}.
+      * Reads flag mask.
+      *
+      * @param mask Mask to read.
+      * @return Flag value.
       */
-     public abstract boolean onResponse(GridNearAtomicUpdateResponse res);
+     private boolean isFlag(int mask) {
+         return (flags & mask) != 0;
+     }
  
      /**
-      * @return Response.
+      * @return Expiry policy.
+      */
+     public abstract ExpiryPolicy expiry();
+ 
+     /**
+      * @return Filter.
+      */
+     @Nullable public abstract CacheEntryPredicate[] filter();
+ 
+     /**
+      * @return Optional arguments for entry processor.
       */
-     @Nullable public abstract GridNearAtomicUpdateResponse response();
+     @Nullable public abstract Object[] invokeArguments();
  
      /**
       * @param key Key to add.

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index dcaf246,ade9976..4a94c22
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@@ -212,26 -152,26 +151,28 @@@ public class GridNearAtomicFullUpdateRe
          @Nullable CacheEntryPredicate[] filter,
          @Nullable UUID subjId,
          int taskNameHash,
+         boolean needPrimaryRes,
          boolean skipStore,
          boolean keepBinary,
 +        boolean recovery,
-         boolean clientReq,
          boolean addDepInfo,
          int maxEntryCnt
      ) {
-         assert futVer != null;
- 
-         this.cacheId = cacheId;
-         this.nodeId = nodeId;
-         this.futVer = futVer;
-         this.fastMap = fastMap;
-         this.updateVer = updateVer;
- 
-         this.topVer = topVer;
-         this.topLocked = topLocked;
-         this.syncMode = syncMode;
-         this.op = op;
-         this.retval = retval;
+         super(cacheId,
+             nodeId,
+             futId,
+             topVer,
+             topLocked,
+             syncMode,
+             op,
+             retval,
+             subjId,
+             taskNameHash,
+             needPrimaryRes,
+             skipStore,
+             keepBinary,
++            recovery,
+             addDepInfo);
          this.expiryPlc = expiryPlc;
          this.invokeArgs = invokeArgs;
          this.filter = filter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index 39b6ab2,c32501a..30197cd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@@ -88,10 -84,9 +84,10 @@@ public class GridNearAtomicSingleUpdate
          @Nullable CacheEntryPredicate[] filter,
          @Nullable UUID subjId,
          int taskNameHash,
+         boolean needPrimaryRes,
          boolean skipStore,
          boolean keepBinary,
 +        boolean recovery,
-         boolean clientReq,
          boolean addDepInfo
      ) {
          super(
@@@ -107,10 -100,9 +101,10 @@@
              retval,
              subjId,
              taskNameHash,
+             needPrimaryRes,
              skipStore,
              keepBinary,
 +            recovery,
-             clientReq,
              addDepInfo
          );
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index dbff89a,c2372d1..6401fbd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@@ -108,8 -102,21 +104,22 @@@ public class GridNearAtomicSingleUpdate
          int remapCnt,
          boolean waitTopFut
      ) {
-         super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
-             skipStore, keepBinary, recovery, remapCnt, waitTopFut);
+         super(cctx,
+             cache,
+             syncMode,
+             op,
+             invokeArgs,
+             retval,
+             rawRetval,
+             expiryPlc,
+             filter,
+             subjId,
+             taskNameHash,
+             skipStore,
+             keepBinary,
++            recovery,
+             remapCnt,
+             waitTopFut);
  
          assert subjId != null;
  
@@@ -378,49 -433,21 +436,21 @@@
      /** {@inheritDoc} */
      @Override protected void mapOnTopology() {
          AffinityTopologyVersion topVer;
-         GridCacheVersion futVer;
- 
-         cache.topology().readLock();
  
-         try {
-             if (cache.topology().stopping()) {
-                 onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
-                     cache.name()));
+         if (cache.topology().stopping()) {
+             onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                 cache.name()));
  
-                 return;
-             }
- 
-             GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- 
-             if (fut.isDone()) {
-                 Throwable err = fut.validateCache(cctx, recovery, /*read*/false, key, null);
- 
-                 if (err != null) {
-                     onDone(err);
+             return;
+         }
  
-                     return;
-                 }
+         GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
  
-                 topVer = fut.topologyVersion();
+         if (fut.isDone()) {
 -            Throwable err = fut.validateCache(cctx);
++            Throwable err = fut.validateCache(cctx, recovery, /*read*/false, key, null);
  
-                 futVer = addAtomicFuture(topVer);
-             }
-             else {
-                 if (waitTopFut) {
-                     assert !topLocked : this;
- 
-                     fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                             cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                 @Override public void run() {
-                                     mapOnTopology();
-                                 }
-                             });
-                         }
-                     });
-                 }
-                 else
-                     onDone(new GridCacheTryPutFailedException());
+             if (err != null) {
+                 onDone(err);
  
                  return;
              }
@@@ -564,10 -628,9 +631,10 @@@
                      invokeArgs,
                      subjId,
                      taskNameHash,
+                     needPrimaryRes,
                      skipStore,
                      keepBinary,
 +                    recovery,
-                     cctx.kernalContext().clientNode(),
                      cctx.deploymentEnabled());
              }
              else {
@@@ -585,10 -646,9 +650,10 @@@
                          retval,
                          subjId,
                          taskNameHash,
+                         needPrimaryRes,
                          skipStore,
                          keepBinary,
 +                        recovery,
-                         cctx.kernalContext().clientNode(),
                          cctx.deploymentEnabled());
                  }
                  else {
@@@ -606,10 -664,9 +669,10 @@@
                          filter,
                          subjId,
                          taskNameHash,
+                         needPrimaryRes,
                          skipStore,
                          keepBinary,
 +                        recovery,
-                         cctx.kernalContext().clientNode(),
                          cctx.deploymentEnabled());
                  }
              }
@@@ -631,10 -686,9 +692,10 @@@
                  filter,
                  subjId,
                  taskNameHash,
+                 needPrimaryRes,
                  skipStore,
                  keepBinary,
 +                recovery,
-                 cctx.kernalContext().clientNode(),
                  cctx.deploymentEnabled(),
                  1);
          }

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 02cfd91,298ea05..bbcad12
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@@ -106,10 -102,9 +102,10 @@@ public class GridNearAtomicSingleUpdate
          @Nullable Object[] invokeArgs,
          @Nullable UUID subjId,
          int taskNameHash,
+         boolean needPrimaryRes,
          boolean skipStore,
          boolean keepBinary,
 +        boolean recovery,
-         boolean clientReq,
          boolean addDepInfo
      ) {
          super(
@@@ -125,15 -118,15 +119,16 @@@
              retval,
              subjId,
              taskNameHash,
+             needPrimaryRes,
              skipStore,
              keepBinary,
 +            recovery,
-             clientReq,
              addDepInfo
          );
-         this.invokeArgs = invokeArgs;
  
          assert op == TRANSFORM : op;
+ 
+         this.invokeArgs = invokeArgs;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 18b6118,14c70aa..94373c4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@@ -100,18 -93,14 +93,15 @@@ public class GridNearAtomicSingleUpdate
          boolean retval,
          @Nullable UUID subjId,
          int taskNameHash,
+         boolean needPrimaryRes,
          boolean skipStore,
          boolean keepBinary,
 +        boolean recovery,
-         boolean clientReq,
          boolean addDepInfo
      ) {
-         super(
-             cacheId,
+         super(cacheId,
              nodeId,
-             futVer,
-             fastMap,
-             updateVer,
+             futId,
              topVer,
              topLocked,
              syncMode,
@@@ -119,12 -108,10 +109,12 @@@
              retval,
              subjId,
              taskNameHash,
+             needPrimaryRes,
              skipStore,
              keepBinary,
 -            addDepInfo);
 +            recovery,
-             clientReq,
 +            addDepInfo
 +        );
      }
  
      /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index f24a9b1,a44ccf9..c5824d5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@@ -490,49 -641,21 +642,21 @@@ public class GridNearAtomicUpdateFutur
      /** {@inheritDoc} */
      @Override protected void mapOnTopology() {
          AffinityTopologyVersion topVer;
-         GridCacheVersion futVer;
- 
-         cache.topology().readLock();
- 
-         try {
-             if (cache.topology().stopping()) {
-                 onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
-                     cache.name()));
  
-                 return;
-             }
+         if (cache.topology().stopping()) {
+             onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                 cache.name()));
  
-             GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- 
-             if (fut.isDone()) {
-                 Throwable err = fut.validateCache(cctx, recovery, false, null, keys);
+             return;
+         }
  
-                 if (err != null) {
-                     onDone(err);
+         GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
  
-                     return;
-                 }
+         if (fut.isDone()) {
 -            Throwable err = fut.validateCache(cctx);
++            Throwable err = fut.validateCache(cctx, recovery, false, null, keys);
  
-                 topVer = fut.topologyVersion();
- 
-                 futVer = addAtomicFuture(topVer);
-             }
-             else {
-                 if (waitTopFut) {
-                     assert !topLocked : this;
- 
-                     fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                             cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                 @Override public void run() {
-                                     mapOnTopology();
-                                 }
-                             });
-                         }
-                     });
-                 }
-                 else
-                     onDone(new GridCacheTryPutFailedException());
+             if (err != null) {
+                 onDone(err);
  
                  return;
              }
@@@ -826,50 -1036,44 +1037,45 @@@
                  throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                      "(all partition nodes left the grid).");
  
-             int i = 0;
- 
-             for (int n = 0; n < affNodes.size(); n++) {
-                 ClusterNode affNode = affNodes.get(n);
- 
-                 if (affNode == null)
-                     throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                         "(all partition nodes left the grid).");
- 
-                 UUID nodeId = affNode.id();
- 
-                 GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId);
- 
-                 if (mapped == null) {
-                     mapped = new GridNearAtomicFullUpdateRequest(
-                         cctx.cacheId(),
-                         nodeId,
-                         futVer,
-                         fastMap,
-                         updVer,
-                         topVer,
-                         topLocked,
-                         syncMode,
-                         op,
-                         retval,
-                         expiryPlc,
-                         invokeArgs,
-                         filter,
-                         subjId,
-                         taskNameHash,
-                         skipStore,
-                         keepBinary,
-                         recovery,
-                         cctx.kernalContext().clientNode(),
-                         cctx.deploymentEnabled(),
-                         keys.size());
- 
-                     pendingMappings.put(nodeId, mapped);
-                 }
+             ClusterNode primary = nodes.get(0);
+ 
+             boolean needPrimaryRes = !mappingKnown || primary.isLocal();
+ 
+             UUID nodeId = primary.id();
  
-                 mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+             PrimaryRequestState mapped = pendingMappings.get(nodeId);
  
-                 i++;
+             if (mapped == null) {
+                 GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
+                     cctx.cacheId(),
+                     nodeId,
+                     futId,
+                     topVer,
+                     topLocked,
+                     syncMode,
+                     op,
+                     retval,
+                     expiryPlc,
+                     invokeArgs,
+                     filter,
+                     subjId,
+                     taskNameHash,
+                     needPrimaryRes,
+                     skipStore,
+                     keepBinary,
++                    recovery,
+                     cctx.deploymentEnabled(),
+                     keys.size());
+ 
+                 mapped = new PrimaryRequestState(req, nodes, false);
+ 
+                 pendingMappings.put(nodeId, mapped);
              }
+ 
+             if (mapped.req.initMappingLocally())
+                 mapped.addMapping(nodes);
+ 
+             mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
          }
  
          return pendingMappings;
@@@ -959,10 -1164,9 +1166,10 @@@
              filter,
              subjId,
              taskNameHash,
+             needPrimaryRes,
              skipStore,
              keepBinary,
 +            recovery,
-             cctx.kernalContext().clientNode(),
              cctx.deploymentEnabled(),
              1);
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 955b8ba,8b52ba8..1c761c8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@@ -351,18 -320,10 +320,13 @@@ public class GridNearAtomicUpdateRespon
       * @param e Error cause.
       */
      public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
 +        assert key != null;
 +        assert e != null;
 +
-         if (failedKeys == null)
-             failedKeys = new ConcurrentLinkedQueue<>();
- 
-         failedKeys.add(key);
+         if (errs == null)
+             errs = new UpdateErrors();
  
-         if (err == null)
-             err = new IgniteCheckedException("Failed to update keys on primary node.");
- 
-         err.addSuppressed(e);
+         errs.addFailedKey(key, e);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index d86dc91,03bbfe0..f922d09
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -208,9 -211,9 +208,11 @@@ public class GridDhtColocatedCache<K, V
  
          final CacheOperationContext opCtx = ctx.operationContextPerCall();
  
++        final boolean recovery = opCtx != null && opCtx.recovery();
++
          if (tx != null && !tx.implicit() && !skipTx) {
              return asyncOp(tx, new AsyncOp<V>() {
-                 @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                 @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                      IgniteInternalFuture<Map<Object, Object>>  fut = tx.getAllAsync(ctx,
                          readyTopVer,
                          Collections.singleton(ctx.toCacheKeyObject(key)),
@@@ -218,6 -221,6 +220,7 @@@
                          skipVals,
                          false,
                          opCtx != null && opCtx.skipStore(),
++                        recovery,
                          needVer);
  
                      return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>, V>() {
@@@ -275,7 -277,6 +278,7 @@@
          @Nullable UUID subjId,
          String taskName,
          final boolean deserializeBinary,
-         boolean recovery,
++        final boolean recovery,
          final boolean skipVals,
          boolean canRemap,
          final boolean needVer
@@@ -302,6 -303,6 +305,7 @@@
                          skipVals,
                          false,
                          opCtx != null && opCtx.skipStore(),
++                        recovery,
                          needVer);
                  }
              }, opCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 31cff03,79c15fb..56dc322
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -161,8 -161,8 +161,11 @@@ public final class GridDhtColocatedLock
      private final boolean keepBinary;
  
      /** */
 +    private final boolean recovery;
 +
++    /** */
+     private int miniId;
+ 
      /**
       * @param cctx Registry.
       * @param keys Keys to lock.
@@@ -917,38 -914,38 +920,36 @@@
                                          !topLocked &&
                                          (tx == null || !tx.hasRemoteLocks());
  
 -                                        first = false;
 -                                    }
 -
 -                                    assert !implicitTx() && !implicitSingleTx() : tx;
 +                                    first = false;
 +                                }
  
-                                 req = new GridNearLockRequest(
-                                     cctx.cacheId(),
-                                     topVer,
-                                     cctx.nodeId(),
-                                     threadId,
-                                     futId,
-                                     lockVer,
-                                     inTx(),
-                                     implicitTx(),
-                                     implicitSingleTx(),
-                                     read,
-                                     retval,
-                                     isolation(),
-                                     isInvalidate(),
-                                     timeout,
-                                     mappedKeys.size(),
-                                     inTx() ? tx.size() : mappedKeys.size(),
-                                     inTx() && tx.syncMode() == FULL_SYNC,
-                                     inTx() ? tx.subjectId() : null,
-                                     inTx() ? tx.taskNameHash() : 0,
-                                     read ? createTtl : -1L,
 -                                    req = new GridNearLockRequest(
++                                    assert !implicitTx() && !implicitSingleTx() : tx;req = new GridNearLockRequest(
+                                         cctx.cacheId(),
+                                         topVer,
+                                         cctx.nodeId(),
+                                         threadId,
+                                         futId,
+                                         lockVer,
+                                         inTx(),
+                                         read,
+                                         retval,
+                                         isolation(),
+                                         isInvalidate(),
+                                         timeout,
+                                         mappedKeys.size(),
+                                         inTx() ? tx.size() : mappedKeys.size(),
+                                         inTx() && tx.syncMode() == FULL_SYNC,
+                                         inTx() ? tx.subjectId() : null,
+                                         inTx() ? tx.taskNameHash() : 0,
+                                         read ? createTtl : -1L,
                                          read ? accessTtl : -1L,
 -                                        skipStore,
 -                                        keepBinary,
 -                                        clientFirst,
 -                                        cctx.deploymentEnabled());
 +                                    skipStore,
 +                                    keepBinary,
 +                                    clientFirst,
 +                                    cctx.deploymentEnabled());
  
 -                                    mapping.request(req);
 -                                }
 +                                mapping.request(req);
 +                            }
  
                              distributedKeys.add(key);
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c33dc7b,5eacc36..829b29d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -33,9 -33,9 +34,10 @@@ import java.util.concurrent.atomic.Atom
  import java.util.concurrent.atomic.AtomicReference;
  import java.util.concurrent.locks.ReadWriteLock;
  import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.internal.IgniteNeedReconnectException;
  import org.apache.ignite.IgniteLogger;
  import org.apache.ignite.IgniteSystemProperties;
 +import org.apache.ignite.cache.PartitionLossPolicy;
  import org.apache.ignite.cluster.ClusterNode;
  import org.apache.ignite.events.CacheEvent;
  import org.apache.ignite.events.DiscoveryEvent;
@@@ -52,9 -51,7 +55,10 @@@ import org.apache.ignite.internal.pagem
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
  import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 +import org.apache.ignite.internal.processors.cache.ClusterState;
 +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+ import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
  import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
  import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
  import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@@ -74,7 -68,7 +78,8 @@@ import org.apache.ignite.internal.util.
  import org.apache.ignite.internal.util.tostring.GridToStringInclude;
  import org.apache.ignite.internal.util.typedef.CI1;
  import org.apache.ignite.internal.util.typedef.F;
 +import org.apache.ignite.internal.util.typedef.T2;
+ import org.apache.ignite.internal.util.typedef.X;
  import org.apache.ignite.internal.util.typedef.internal.CU;
  import org.apache.ignite.internal.util.typedef.internal.LT;
  import org.apache.ignite.internal.util.typedef.internal.S;
@@@ -98,9 -88,8 +103,9 @@@ import static org.apache.ignite.interna
  /**
   * Future for exchanging partition maps.
   */
 +@SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
  public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
-     implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
+     implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask {
      /** */
      public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
          IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 12b5204,79c71b3..6aa7441
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@@ -391,43 -360,37 +372,43 @@@ public class GridNearGetRequest extend
  
                  writer.incrementState();
  
-             case 11:
+             case 10:
 -                if (!writer.writeBoolean("reload", reload))
 +                if (!writer.writeBoolean("recovery", recovery))
                      return false;
  
                  writer.incrementState();
  
-             case 12:
+             case 11:
 -                if (!writer.writeBoolean("skipVals", skipVals))
 +                if (!writer.writeBoolean("reload", reload))
                      return false;
  
                  writer.incrementState();
  
-             case 13:
+             case 12:
 -                if (!writer.writeUuid("subjId", subjId))
 +                if (!writer.writeBoolean("skipVals", skipVals))
                      return false;
  
                  writer.incrementState();
  
-             case 14:
+             case 13:
 -                if (!writer.writeInt("taskNameHash", taskNameHash))
 +                if (!writer.writeUuid("subjId", subjId))
                      return false;
  
                  writer.incrementState();
  
-             case 15:
+             case 14:
 -                if (!writer.writeMessage("topVer", topVer))
 +                if (!writer.writeInt("taskNameHash", taskNameHash))
                      return false;
  
                  writer.incrementState();
  
-             case 16:
+             case 15:
 +                if (!writer.writeMessage("topVer", topVer))
 +                    return false;
 +
 +                writer.incrementState();
 +
 +            case 17:
                  if (!writer.writeMessage("ver", ver))
                      return false;
  
@@@ -513,55 -468,47 +486,55 @@@
  
                  reader.incrementState();
  
-             case 11:
+             case 10:
 -                reload = reader.readBoolean("reload");
 +                recovery = reader.readBoolean("recovery");
  
                  if (!reader.isLastRead())
                      return false;
  
                  reader.incrementState();
  
-             case 12:
+             case 11:
 -                skipVals = reader.readBoolean("skipVals");
 +                reload = reader.readBoolean("reload");
  
                  if (!reader.isLastRead())
                      return false;
  
                  reader.incrementState();
  
-             case 13:
+             case 12:
 -                subjId = reader.readUuid("subjId");
 +                skipVals = reader.readBoolean("skipVals");
  
                  if (!reader.isLastRead())
                      return false;
  
                  reader.incrementState();
  
-             case 14:
+             case 13:
 -                taskNameHash = reader.readInt("taskNameHash");
 +                subjId = reader.readUuid("subjId");
  
                  if (!reader.isLastRead())
                      return false;
  
                  reader.incrementState();
  
-             case 15:
+             case 14:
 -                topVer = reader.readMessage("topVer");
 +                taskNameHash = reader.readInt("taskNameHash");
  
                  if (!reader.isLastRead())
                      return false;
  
                  reader.incrementState();
  
-             case 16:
+             case 15:
 +                topVer = reader.readMessage("topVer");
 +
 +                if (!reader.isLastRead())
 +                    return false;
 +
 +                reader.incrementState();
 +
 +            case 17:
                  ver = reader.readMessage("ver");
  
                  if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index d8f9222,1948df0..0900bac
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@@ -164,9 -164,9 +164,12 @@@ public final class GridNearLockFuture e
      /** Keep binary context flag. */
      private final boolean keepBinary;
  
 +    /** Recovery mode context flag. */
 +    private final boolean recovery;
 +
+     /** */
+     private int miniId;
+ 
      /**
       * @param cctx Registry.
       * @param keys Keys to lock.

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index ec4b9e5,48b508b..e519707
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@@ -398,67 -376,31 +376,31 @@@ public class GridNearLockRequest extend
  
                  writer.incrementState();
  
 -            case 24:
 +            case 25:
-                 if (!writer.writeBoolean("firstClientReq", firstClientReq))
+                 if (!writer.writeByte("flags", flags))
                      return false;
  
                  writer.incrementState();
  
 -            case 25:
 +            case 26:
-                 if (!writer.writeBoolean("hasTransforms", hasTransforms))
+                 if (!writer.writeInt("miniId", miniId))
                      return false;
  
                  writer.incrementState();
  
 -            case 26:
 +            case 27:
-                 if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
-                     return false;
- 
-                 writer.incrementState();
- 
-             case 28:
-                 if (!writer.writeBoolean("implicitTx", implicitTx))
-                     return false;
- 
-                 writer.incrementState();
- 
-             case 29:
-                 if (!writer.writeIgniteUuid("miniId", miniId))
-                     return false;
- 
-                 writer.incrementState();
- 
-             case 30:
-                 if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
-                     return false;
- 
-                 writer.incrementState();
- 
-             case 31:
-                 if (!writer.writeBoolean("retVal", retVal))
-                     return false;
- 
-                 writer.incrementState();
- 
-             case 32:
                  if (!writer.writeUuid("subjId", subjId))
                      return false;
  
                  writer.incrementState();
  
-             case 33:
-                 if (!writer.writeBoolean("syncCommit", syncCommit))
-                     return false;
- 
-                 writer.incrementState();
- 
-             case 34:
 -            case 27:
++            case 28:
                  if (!writer.writeInt("taskNameHash", taskNameHash))
                      return false;
  
                  writer.incrementState();
  
-             case 35:
 -            case 28:
++            case 29:
                  if (!writer.writeMessage("topVer", topVer))
                      return false;
  
@@@ -512,63 -454,23 +454,23 @@@
  
                  reader.incrementState();
  
 -            case 24:
 +            case 25:
-                 firstClientReq = reader.readBoolean("firstClientReq");
+                 flags = reader.readByte("flags");
  
                  if (!reader.isLastRead())
                      return false;
  
                  reader.incrementState();
  
 -            case 25:
 +            case 26:
-                 hasTransforms = reader.readBoolean("hasTransforms");
+                 miniId = reader.readInt("miniId");
  
                  if (!reader.isLastRead())
                      return false;
  
                  reader.incrementState();
  
 -            case 26:
 +            case 27:
-                 implicitSingleTx = reader.readBoolean("implicitSingleTx");
- 
-                 if (!reader.isLastRead())
-                     return false;
- 
-                 reader.incrementState();
- 
-             case 28:
-                 implicitTx = reader.readBoolean("implicitTx");
- 
-                 if (!reader.isLastRead())
-                     return false;
- 
-                 reader.incrementState();
- 
-             case 29:
-                 miniId = reader.readIgniteUuid("miniId");
- 
-                 if (!reader.isLastRead())
-                     return false;
- 
-                 reader.incrementState();
- 
-             case 30:
-                 onePhaseCommit = reader.readBoolean("onePhaseCommit");
- 
-                 if (!reader.isLastRead())
-                     return false;
- 
-                 reader.incrementState();
- 
-             case 31:
-                 retVal = reader.readBoolean("retVal");
- 
-                 if (!reader.isLastRead())
-                     return false;
- 
-                 reader.incrementState();
- 
-             case 32:
                  subjId = reader.readUuid("subjId");
  
                  if (!reader.isLastRead())
@@@ -576,15 -478,7 +478,7 @@@
  
                  reader.incrementState();
  
-             case 33:
-                 syncCommit = reader.readBoolean("syncCommit");
- 
-                 if (!reader.isLastRead())
-                     return false;
- 
-                 reader.incrementState();
- 
-             case 34:
 -            case 27:
++            case 28:
                  taskNameHash = reader.readInt("taskNameHash");
  
                  if (!reader.isLastRead())
@@@ -592,7 -486,7 +486,7 @@@
  
                  reader.incrementState();
  
-             case 35:
 -            case 28:
++            case 29:
                  topVer = reader.readMessage("topVer");
  
                  if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index f09b6c8,976f05f..1d610c7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@@ -62,6 -62,19 +62,19 @@@ public abstract class GridNearOptimisti
          }
  
          if (topVer != null) {
+             try {
 -                IgniteCheckedException err = tx.txState().validateTopology(cctx, topologyReadLock());
++                IgniteCheckedException err = tx.txState().validateTopology(cctx, false, topologyReadLock());
+ 
+                 if (err != null) {
+                     onDone(err);
+ 
+                     return;
+                 }
+             }
+             finally {
+                 topologyReadUnlock();
+             }
+ 
              tx.topologyVersion(topVer);
  
              cctx.mvcc().addFuture(this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index cd6e275,994172b..de69b21
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@@ -44,23 -42,20 +42,23 @@@ public class GridNearSingleGetRequest e
      private static final long serialVersionUID = 0L;
  
      /** */
-     public static final int READ_THROUGH_FLAG_MASK = 0x01;
+     private static final int READ_THROUGH_FLAG_MASK = 0x01;
  
      /** */
-     public static final int SKIP_VALS_FLAG_MASK = 0x02;
+     private static final int SKIP_VALS_FLAG_MASK = 0x02;
  
      /** */
-     public static final int ADD_READER_FLAG_MASK = 0x04;
+     private static final int ADD_READER_FLAG_MASK = 0x04;
  
      /** */
-     public static final int NEED_VER_FLAG_MASK = 0x08;
+     private static final int NEED_VER_FLAG_MASK = 0x08;
  
      /** */
-     public static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
+     private static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
  
 +    /** */
 +    public static final int RECOVERY_FLAG_MASK = 0x20;
 +
      /** Future ID. */
      private long futId;
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 1bb39e2,5ad05b0..1468e8a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@@ -43,7 -43,7 +43,6 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
  import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
--import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
  import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
  import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
  import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@@ -121,7 -121,6 +120,7 @@@ public class GridNearTransactionalCache
          @Nullable UUID subjId,
          String taskName,
          final boolean deserializeBinary,
-         boolean recovery,
++        final boolean recovery,
          final boolean skipVals,
          boolean canRemap,
          final boolean needVer
@@@ -150,6 -149,6 +149,7 @@@
                          skipVals,
                          false,
                          skipStore,
++                        recovery,
                          needVer);
                  }
              }, opCtx);