You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/22 13:31:04 UTC

[1/2] ignite git commit: Restore clock mode.

Repository: ignite
Updated Branches:
  refs/heads/ignite-2.0-clock [created] 0015962a4


http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFastMapUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFastMapUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFastMapUpdateFuture.java
new file mode 100644
index 0000000..9048e85
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFastMapUpdateFuture.java
@@ -0,0 +1,1026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * DHT atomic cache near update future.
+ */
+public class GridNearAtomicFastMapUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
+    /** Completion future for a particular topology version. */
+    private GridFutureAdapter<Void> topCompleteFut;
+
+    /** */
+    private final boolean fastMap = true;
+
+    /** Topology version when got mapping error. */
+    private AffinityTopologyVersion mapErrTopVer;
+
+    /** Keys */
+    private Collection<?> keys;
+
+    /** Values. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private Collection<?> vals;
+
+    /** Conflict put values. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private Collection<GridCacheDrInfo> conflictPutVals;
+
+    /** Conflict remove values. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private Collection<GridCacheVersion> conflictRmvVals;
+
+    /** Mappings if operations is mapped to more than one node. */
+    @GridToStringInclude
+    private Map<UUID, GridNearAtomicFullUpdateRequest> mappings;
+
+    /** Keys to remap. */
+    private Collection<KeyCacheObject> remapKeys;
+
+    /** Not null is operation is mapped to single node. */
+    private GridNearAtomicFullUpdateRequest singleReq;
+
+    /** */
+    private int resCnt;
+
+    /**
+     * @param cctx Cache context.
+     * @param cache Cache instance.
+     * @param syncMode Write synchronization mode.
+     * @param op Update operation.
+     * @param keys Keys to update.
+     * @param vals Values or transform closure.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @param conflictPutVals Conflict put values (optional).
+     * @param conflictRmvVals Conflict remove values (optional).
+     * @param retval Return value require flag.
+     * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+     * @param expiryPlc Expiry policy explicitly specified for cache operation.
+     * @param filter Entry filter.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip store flag.
+     * @param keepBinary Keep binary flag.
+     * @param remapCnt Maximum number of retries.
+     * @param waitTopFut If {@code false} does not wait for affinity change future.
+     */
+    GridNearAtomicFastMapUpdateFuture(
+        GridCacheContext cctx,
+        GridDhtAtomicCache cache,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        Collection<?> keys,
+        @Nullable Collection<?> vals,
+        @Nullable Object[] invokeArgs,
+        @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+        @Nullable Collection<GridCacheVersion> conflictRmvVals,
+        final boolean retval,
+        final boolean rawRetval,
+        @Nullable ExpiryPolicy expiryPlc,
+        final CacheEntryPredicate[] filter,
+        UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        int remapCnt,
+        boolean waitTopFut
+    ) {
+        super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
+            skipStore, keepBinary, remapCnt, waitTopFut);
+
+        assert vals == null || vals.size() == keys.size();
+        assert conflictPutVals == null || conflictPutVals.size() == keys.size();
+        assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
+        assert subjId != null;
+        assert cache.isFastMap(filter, op);
+
+        this.keys = keys;
+        this.vals = vals;
+        this.conflictPutVals = conflictPutVals;
+        this.conflictRmvVals = conflictRmvVals;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Long id() {
+        synchronized (mux) {
+            return futId;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean fastMap() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        GridNearAtomicUpdateResponse res = null;
+
+        GridNearAtomicFullUpdateRequest req;
+
+        synchronized (mux) {
+            if (singleReq != null)
+                req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+            else
+                req = mappings != null ? mappings.get(nodeId) : null;
+
+            if (req != null && req.response() == null) {
+                res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                    nodeId,
+                    req.futureId(),
+                    req.partition(),
+                    false,
+                    cctx.deploymentEnabled());
+
+                ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+                    "before response is received: " + nodeId);
+
+                e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+                res.addFailedKeys(req.keys(), e);
+            }
+        }
+
+        if (res != null) {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
+                    ", writeVer=" + req.updateVersion() +
+                    ", node=" + nodeId + ']');
+            }
+
+            onPrimaryResponse(nodeId, res, true);
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        // Wait fast-map near atomic update futures in CLOCK mode.
+        if (fastMap) {
+            GridFutureAdapter<Void> fut;
+
+            synchronized (mux) {
+                if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) {
+                    if (topCompleteFut == null)
+                        topCompleteFut = new GridFutureAdapter<>();
+
+                    fut = topCompleteFut;
+                }
+                else
+                    fut = null;
+            }
+
+            if (fut != null && isDone()) {
+                fut.onDone();
+
+                return null;
+            }
+
+            return fut;
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+        assert res == null || res instanceof GridCacheReturn;
+
+        GridCacheReturn ret = (GridCacheReturn)res;
+
+        Object retval =
+            res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
+                cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+
+        if (op == TRANSFORM && retval == null)
+            retval = Collections.emptyMap();
+
+        if (super.onDone(retval, err)) {
+            Long futId = onFutureDone();
+
+            if (futId != null)
+                cctx.mvcc().removeAtomicFuture(futId);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+        assert false;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+    @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+        GridNearAtomicFullUpdateRequest req;
+
+        AffinityTopologyVersion remapTopVer = null;
+
+        GridCacheReturn opRes0 = null;
+        CachePartialUpdateCheckedException err0 = null;
+
+        boolean rcvAll;
+
+        GridFutureAdapter<?> fut0 = null;
+
+        synchronized (mux) {
+            if (futId == null || futId != res.futureId())
+                return;
+
+            if (singleReq != null) {
+                if (!singleReq.nodeId().equals(nodeId))
+                    return;
+
+                req = singleReq;
+
+                singleReq = null;
+
+                rcvAll = true;
+            }
+            else {
+                req = mappings != null ? mappings.get(nodeId) : null;
+
+                if (req != null && req.onResponse(res)) {
+                    resCnt++;
+
+                    rcvAll = mappings.size() == resCnt;
+                }
+                else
+                    return;
+            }
+
+            assert req != null && req.topologyVersion().equals(topVer) : req;
+
+            if (res.remapTopologyVersion() != null) {
+                assert !fastMap || cctx.kernalContext().clientNode();
+
+                if (remapKeys == null)
+                    remapKeys = U.newHashSet(req.size());
+
+                remapKeys.addAll(req.keys());
+
+                if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+                    mapErrTopVer = req.topologyVersion();
+            }
+            else if (res.error() != null) {
+                if (res.failedKeys() != null) {
+                    if (err == null)
+                        err = new CachePartialUpdateCheckedException(
+                            "Failed to update keys (retry update if possible).");
+
+                    Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
+
+                    for (KeyCacheObject key : res.failedKeys())
+                        keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+
+                    err.add(keys, res.error(), req.topologyVersion());
+                }
+            }
+            else {
+                if (!req.fastMap() || req.hasPrimary()) {
+                    GridCacheReturn ret = res.returnValue();
+
+                    if (op == TRANSFORM) {
+                        if (ret != null) {
+                            assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+                            if (ret.value() != null) {
+                                if (opRes != null)
+                                    opRes.mergeEntryProcessResults(ret);
+                                else
+                                    opRes = ret;
+                            }
+                        }
+                    }
+                    else
+                        opRes = ret;
+                }
+            }
+
+            if (rcvAll) {
+                if (remapKeys != null) {
+                    assert mapErrTopVer != null;
+
+                    remapTopVer = cctx.shared().exchange().topologyVersion();
+                }
+                else {
+                    if (err != null &&
+                        X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                        X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                        storeFuture() &&
+                        --remapCnt > 0) {
+                        ClusterTopologyCheckedException topErr =
+                            X.cause(err, ClusterTopologyCheckedException.class);
+
+                        if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                            CachePartialUpdateCheckedException cause =
+                                X.cause(err, CachePartialUpdateCheckedException.class);
+
+                            assert cause != null && cause.topologyVersion() != null : err;
+
+                            remapTopVer =
+                                new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+                            err = null;
+
+                            Collection<Object> failedKeys = cause.failedKeys();
+
+                            remapKeys = new ArrayList<>(failedKeys.size());
+
+                            for (Object key : failedKeys)
+                                remapKeys.add(cctx.toCacheKeyObject(key));
+
+                            updVer = null;
+                        }
+                    }
+                }
+
+                if (remapTopVer == null) {
+                    err0 = err;
+                    opRes0 = opRes;
+                }
+                else {
+                    fut0 = topCompleteFut;
+
+                    topCompleteFut = null;
+
+                    cctx.mvcc().removeAtomicFuture(futId);
+
+                    futId = null;
+                    topVer = AffinityTopologyVersion.ZERO;
+                }
+            }
+        }
+
+        if (res.error() != null && res.failedKeys() == null) {
+            onDone(res.error());
+
+            return;
+        }
+
+        if (rcvAll && nearEnabled) {
+            if (mappings != null) {
+                for (GridNearAtomicFullUpdateRequest req0 : mappings.values()) {
+                    GridNearAtomicUpdateResponse res0 = req0.response();
+
+                    assert res0 != null : req0;
+
+                    updateNear(req0, res0);
+                }
+            }
+            else if (!nodeErr)
+                updateNear(req, res);
+        }
+
+        if (remapTopVer != null) {
+            if (fut0 != null)
+                fut0.onDone();
+
+            if (!waitTopFut) {
+                onDone(new GridCacheTryPutFailedException());
+
+                return;
+            }
+
+            if (topLocked) {
+                assert !F.isEmpty(remapKeys) : remapKeys;
+
+                CachePartialUpdateCheckedException e =
+                    new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+                ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                    "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+                cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+                e.add(remapKeys, cause);
+
+                onDone(e);
+
+                return;
+            }
+
+            IgniteInternalFuture<AffinityTopologyVersion> fut =
+                cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+            if (fut == null)
+                fut = new GridFinishedFuture<>(remapTopVer);
+
+            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                        @Override public void run() {
+                            mapOnTopology();
+                        }
+                    });
+                }
+            });
+
+            return;
+        }
+
+        if (rcvAll)
+            onDone(opRes0, err0);
+    }
+
+    /**
+     * Updates near cache.
+     *
+     * @param req Update request.
+     * @param res Update response.
+     */
+    private void updateNear(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
+        assert nearEnabled;
+
+        if (res.remapTopologyVersion() != null || !req.hasPrimary())
+            return;
+
+        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+        near.processNearAtomicUpdateResponse(req, res);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void mapOnTopology() {
+        AffinityTopologyVersion topVer;
+        Long futId;
+
+        cache.topology().readLock();
+
+        try {
+            if (cache.topology().stopping()) {
+                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                    cache.name()));
+
+                return;
+            }
+
+            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+
+            if (fut.isDone()) {
+                Throwable err = fut.validateCache(cctx);
+
+                if (err != null) {
+                    onDone(err);
+
+                    return;
+                }
+
+                topVer = fut.topologyVersion();
+
+                futId = addAtomicFuture(topVer);
+            }
+            else {
+                if (waitTopFut) {
+                    assert !topLocked : this;
+
+                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    mapOnTopology();
+                                }
+                            });
+                        }
+                    });
+                }
+                else
+                    onDone(new GridCacheTryPutFailedException());
+
+                return;
+            }
+        }
+        finally {
+            cache.topology().readUnlock();
+        }
+
+        if (futId != null)
+            map(topVer, futId, remapKeys);
+    }
+
+    /**
+     * Sends messages to remote nodes and updates local cache.
+     *
+     * @param mappings Mappings to send.
+     */
+    private void doUpdate(Map<UUID, GridNearAtomicFullUpdateRequest> mappings) {
+        UUID locNodeId = cctx.localNodeId();
+
+        GridNearAtomicFullUpdateRequest locUpdate = null;
+
+        // Send messages to remote nodes first, then run local update.
+        for (GridNearAtomicFullUpdateRequest req : mappings.values()) {
+            if (locNodeId.equals(req.nodeId())) {
+                assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+                    ", req=" + req + ']';
+
+                locUpdate = req;
+            }
+            else {
+                try {
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
+                            ", writeVer=" + req.updateVersion() +
+                            ", node=" + req.nodeId() + ']');
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
+                            ", writeVer=" + req.updateVersion() +
+                            ", node=" + req.nodeId() +
+                            ", err=" + e + ']');
+                    }
+
+                    onSendError(req, e);
+                }
+            }
+        }
+
+        if (locUpdate != null) {
+            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+                new GridDhtAtomicCache.UpdateReplyClosure() {
+                    @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                        onPrimaryResponse(res.nodeId(), res, false);
+                    }
+                });
+        }
+
+        if (syncMode == FULL_ASYNC)
+            onDone(new GridCacheReturn(cctx, true, true, null, true));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void map(AffinityTopologyVersion topVer, Long futVer) {
+        map(topVer, futVer, null);
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param futId Future ID.
+     * @param remapKeys Keys to remap.
+     */
+    void map(AffinityTopologyVersion topVer,
+        Long futId,
+        @Nullable Collection<KeyCacheObject> remapKeys) {
+        Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+
+        if (F.isEmpty(topNodes)) {
+            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                "left the grid)."));
+
+            return;
+        }
+
+        GridCacheVersion updVer;
+
+        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+        if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+            updVer = this.updVer;
+
+            if (updVer == null) {
+                updVer = cctx.versions().next(topVer);
+
+                if (log.isDebugEnabled())
+                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+            }
+        }
+        else
+            updVer = null;
+
+        Exception err = null;
+        GridNearAtomicFullUpdateRequest singleReq0 = null;
+        Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null;
+
+        int size = keys.size();
+
+        try {
+            if (size == 1 && !fastMap) {
+                assert remapKeys == null || remapKeys.size() == 1;
+
+                singleReq0 = mapSingleUpdate(topVer, futId, updVer);
+            }
+            else {
+                Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = mapUpdate(topNodes,
+                    topVer,
+                    futId,
+                    updVer,
+                    remapKeys);
+
+                if (pendingMappings.size() == 1)
+                    singleReq0 = F.firstValue(pendingMappings);
+                else {
+                    if (syncMode == PRIMARY_SYNC) {
+                        mappings0 = U.newHashMap(pendingMappings.size());
+
+                        for (GridNearAtomicFullUpdateRequest req : pendingMappings.values()) {
+                            if (req.hasPrimary())
+                                mappings0.put(req.nodeId(), req);
+                        }
+                    }
+                    else
+                        mappings0 = pendingMappings;
+
+                    assert !mappings0.isEmpty() || size == 0 : this;
+                }
+            }
+
+            synchronized (mux) {
+                assert futId.equals(this.futId) || (this.isDone() && this.error() != null);
+                assert this.topVer == topVer;
+
+                this.updVer = updVer;
+
+                resCnt = 0;
+
+                singleReq = singleReq0;
+                mappings = mappings0;
+
+                this.remapKeys = null;
+            }
+        }
+        catch (Exception e) {
+            err = e;
+        }
+
+        if (err != null) {
+            onDone(err);
+
+            return;
+        }
+
+        // Optimize mapping for single key.
+        if (singleReq0 != null)
+            sendSingleRequest(singleReq0.nodeId(), singleReq0);
+        else {
+            assert mappings0 != null;
+
+            if (size == 0)
+                onDone(new GridCacheReturn(cctx, true, true, null, true));
+            else
+                doUpdate(mappings0);
+        }
+    }
+
+    /**
+     * @return Future version.
+     */
+    private Long onFutureDone() {
+        Long futId0;
+
+        GridFutureAdapter<Void> fut0;
+
+        synchronized (mux) {
+            fut0 = topCompleteFut;
+
+            topCompleteFut = null;
+
+            futId0 = futId;
+
+            futId = null;
+        }
+
+        if (fut0 != null)
+            fut0.onDone();
+
+        return futId0;
+    }
+
+    /**
+     * @param topNodes Cache nodes.
+     * @param topVer Topology version.
+     * @param futId Future ID.
+     * @param updVer Update version.
+     * @param remapKeys Keys to remap.
+     * @return Mapping.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private Map<UUID, GridNearAtomicFullUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+        AffinityTopologyVersion topVer,
+        Long futId,
+        @Nullable GridCacheVersion updVer,
+        @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+        Iterator<?> it = null;
+
+        if (vals != null)
+            it = vals.iterator();
+
+        Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+
+        if (conflictPutVals != null)
+            conflictPutValsIt = conflictPutVals.iterator();
+
+        Iterator<GridCacheVersion> conflictRmvValsIt = null;
+
+        if (conflictRmvVals != null)
+            conflictRmvValsIt = conflictRmvVals.iterator();
+
+        Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+
+        // Create mappings first, then send messages.
+        for (Object key : keys) {
+            if (key == null)
+                throw new NullPointerException("Null key.");
+
+            Object val;
+            GridCacheVersion conflictVer;
+            long conflictTtl;
+            long conflictExpireTime;
+
+            if (vals != null) {
+                val = it.next();
+                conflictVer = null;
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+                if (val == null)
+                    throw new NullPointerException("Null value.");
+            }
+            else if (conflictPutVals != null) {
+                GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
+
+                val = conflictPutVal.valueEx();
+                conflictVer = conflictPutVal.version();
+                conflictTtl = conflictPutVal.ttl();
+                conflictExpireTime = conflictPutVal.expireTime();
+            }
+            else if (conflictRmvVals != null) {
+                val = null;
+                conflictVer = conflictRmvValsIt.next();
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+            }
+            else {
+                val = null;
+                conflictVer = null;
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+            }
+
+            if (val == null && op != GridCacheOperation.DELETE)
+                continue;
+
+            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+            if (remapKeys != null && !remapKeys.contains(cacheKey))
+                continue;
+
+            if (op != TRANSFORM)
+                val = cctx.toCacheObject(val);
+            else
+                val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
+
+            List<ClusterNode> affNodes = mapKey(cacheKey, topVer);
+
+            if (affNodes.isEmpty())
+                throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                    "(all partition nodes left the grid).");
+
+            int i = 0;
+
+            for (int n = 0; n < affNodes.size(); n++) {
+                ClusterNode affNode = affNodes.get(n);
+
+                if (affNode == null)
+                    throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                        "(all partition nodes left the grid).");
+
+                UUID nodeId = affNode.id();
+
+                GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId);
+
+                if (mapped == null) {
+                    mapped = new GridNearAtomicFullUpdateRequest(
+                        cctx.cacheId(),
+                        nodeId,
+                        futId,
+                        topVer,
+                        topLocked,
+                        syncMode,
+                        op,
+                        retval,
+                        expiryPlc,
+                        invokeArgs,
+                        filter,
+                        subjId,
+                        taskNameHash,
+                        /*fastMap*/true,
+                        cctx.kernalContext().clientNode(),
+                        /*needPrimaryRes*/false,
+                        skipStore,
+                        keepBinary,
+                        cctx.deploymentEnabled(),
+                        keys.size());
+
+                    mapped.updateVersion(updVer);
+
+                    pendingMappings.put(nodeId, mapped);
+                }
+
+                mapped.addUpdateEntry(cacheKey,
+                    val,
+                    conflictTtl,
+                    conflictExpireTime,
+                    conflictVer,
+                    i == 0);
+
+                i++;
+            }
+        }
+
+        return pendingMappings;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param futId Future ID.
+     * @param updVer Update version.
+     * @return Request.
+     * @throws Exception If failed.
+     */
+    private GridNearAtomicFullUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+        Long futId,
+        @Nullable GridCacheVersion updVer) throws Exception {
+        Object key = F.first(keys);
+
+        Object val;
+        GridCacheVersion conflictVer;
+        long conflictTtl;
+        long conflictExpireTime;
+
+        if (vals != null) {
+            // Regular PUT.
+            val = F.first(vals);
+            conflictVer = null;
+            conflictTtl = CU.TTL_NOT_CHANGED;
+            conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+        }
+        else if (conflictPutVals != null) {
+            // Conflict PUT.
+            GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+
+            val = conflictPutVal.valueEx();
+            conflictVer = conflictPutVal.version();
+            conflictTtl = conflictPutVal.ttl();
+            conflictExpireTime = conflictPutVal.expireTime();
+        }
+        else if (conflictRmvVals != null) {
+            // Conflict REMOVE.
+            val = null;
+            conflictVer = F.first(conflictRmvVals);
+            conflictTtl = CU.TTL_NOT_CHANGED;
+            conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+        }
+        else {
+            // Regular REMOVE.
+            val = null;
+            conflictVer = null;
+            conflictTtl = CU.TTL_NOT_CHANGED;
+            conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+        }
+
+        // We still can get here if user pass map with single element.
+        if (key == null)
+            throw new NullPointerException("Null key.");
+
+        if (val == null && op != GridCacheOperation.DELETE)
+            throw new NullPointerException("Null value.");
+
+        KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+        if (op != TRANSFORM)
+            val = cctx.toCacheObject(val);
+        else
+            val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
+
+        ClusterNode primary = cctx.affinity().primaryByPartition(cacheKey.partition(), topVer);
+
+        if (primary == null)
+            throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                "left the grid).");
+
+        GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
+            cctx.cacheId(),
+            primary.id(),
+            futId,
+            topVer,
+            topLocked,
+            syncMode,
+            op,
+            retval,
+            expiryPlc,
+            invokeArgs,
+            filter,
+            subjId,
+            taskNameHash,
+            /*fastMap*/true,
+            cctx.kernalContext().clientNode(),
+            /*needPrimaryRes*/false,
+            skipStore,
+            keepBinary,
+            cctx.deploymentEnabled(),
+            1);
+
+        req.updateVersion(updVer);
+
+        req.addUpdateEntry(cacheKey,
+            val,
+            conflictTtl,
+            conflictExpireTime,
+            conflictVer,
+            true);
+
+        return req;
+    }
+
+    /**
+     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+     * node and send updates in parallel to all participating nodes.
+     *
+     * @param key Key to map.
+     * @param topVer Topology version to map.
+     * @return Collection of nodes to which key is mapped.
+     */
+    private List<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        GridCacheAffinityManager affMgr = cctx.affinity();
+
+        // If we can send updates in parallel - do it.
+        return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primaryByKey(key, topVer));
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        synchronized (mux) {
+            return S.toString(GridNearAtomicFastMapUpdateFuture.class, this, super.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index c381333..ede1a05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -151,6 +151,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean fastMap,
+        boolean clientReq,
         boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
@@ -167,6 +169,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
             retval,
             subjId,
             taskNameHash,
+            fastMap,
+            clientReq,
             needPrimaryRes,
             skipStore,
             keepBinary,
@@ -189,7 +193,11 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer) {
+        @Nullable GridCacheVersion conflictVer,
+        boolean primary) {
+        if (primary)
+            hasPrimary(true);
+
         EntryProcessor<Object, Object, Object> entryProcessor = null;
 
         if (op == TRANSFORM) {
@@ -445,55 +453,55 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         }
 
         switch (writer.state()) {
-            case 10:
+            case 11:
                 if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
                     return false;
 
                 writer.incrementState();
 
-            case 11:
+            case 12:
                 if (!writer.writeMessage("conflictTtls", conflictTtls))
                     return false;
 
                 writer.incrementState();
 
-            case 12:
+            case 13:
                 if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 13:
+            case 14:
                 if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
-            case 14:
+            case 15:
                 if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 15:
+            case 16:
                 if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 16:
+            case 17:
                 if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
-            case 17:
+            case 18:
                 if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 18:
+            case 19:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -515,7 +523,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
             return false;
 
         switch (reader.state()) {
-            case 10:
+            case 11:
                 conflictExpireTimes = reader.readMessage("conflictExpireTimes");
 
                 if (!reader.isLastRead())
@@ -523,7 +531,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 11:
+            case 12:
                 conflictTtls = reader.readMessage("conflictTtls");
 
                 if (!reader.isLastRead())
@@ -531,7 +539,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 12:
+            case 13:
                 conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -539,7 +547,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 13:
+            case 14:
                 entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
@@ -547,7 +555,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 14:
+            case 15:
                 expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
 
                 if (!reader.isLastRead())
@@ -555,7 +563,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 15:
+            case 16:
                 filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
 
                 if (!reader.isLastRead())
@@ -563,7 +571,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 16:
+            case 17:
                 invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
@@ -571,7 +579,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 17:
+            case 18:
                 keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -579,7 +587,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 18:
+            case 19:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -611,7 +619,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 19;
+        return 20;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index 78582b0..ab55cf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -83,6 +83,8 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean fastMap,
+        boolean clientReq,
         boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
@@ -99,6 +101,8 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
             retval,
             subjId,
             taskNameHash,
+            fastMap,
+            clientReq,
             needPrimaryRes,
             skipStore,
             keepBinary,
@@ -166,7 +170,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
         }
 
         switch (writer.state()) {
-            case 12:
+            case 13:
                 if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
                     return false;
 
@@ -188,7 +192,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
             return false;
 
         switch (reader.state()) {
-            case 12:
+            case 13:
                 filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
 
                 if (!reader.isLastRead())
@@ -208,7 +212,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 930c4af..da590ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -119,6 +119,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             waitTopFut);
 
         assert subjId != null;
+        assert !cache.isFastMap(filter, op);
 
         this.key = key;
         this.val = val;
@@ -344,6 +345,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
 
                 if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                    updVer = null;
+
                     CachePartialUpdateCheckedException cause =
                         X.cause(err, CachePartialUpdateCheckedException.class);
 
@@ -474,11 +477,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             return;
         }
 
-        map(topVer);
+        map(topVer, null);
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer) {
+    @Override protected void map(AffinityTopologyVersion topVer, Long ignore) {
         long futId = cctx.mvcc().atomicFutureId();
 
         Exception err = null;
@@ -550,9 +553,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 err0 = err;
                 remapTopVer0 = onAllReceived();
             }
-            else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY){
+            else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
                 checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
-            }
             else
                 return true;
         }
@@ -632,6 +634,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     invokeArgs,
                     subjId,
                     taskNameHash,
+                    /*fastMap*/false,
+                    cctx.kernalContext().clientNode(),
                     needPrimaryRes,
                     skipStore,
                     keepBinary,
@@ -650,6 +654,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         retval,
                         subjId,
                         taskNameHash,
+                        /*fastMap*/false,
+                        cctx.kernalContext().clientNode(),
                         needPrimaryRes,
                         skipStore,
                         keepBinary,
@@ -668,6 +674,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         filter,
                         subjId,
                         taskNameHash,
+                        /*fastMap*/false,
+                        cctx.kernalContext().clientNode(),
                         needPrimaryRes,
                         skipStore,
                         keepBinary,
@@ -690,6 +698,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 filter,
                 subjId,
                 taskNameHash,
+                /*fastMap*/false,
+                cctx.kernalContext().clientNode(),
                 needPrimaryRes,
                 skipStore,
                 keepBinary,
@@ -697,11 +707,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 1);
         }
 
+        initUpdateVersion(req, topVer);
+
         req.addUpdateEntry(cacheKey,
             val,
             CU.TTL_NOT_CHANGED,
             CU.EXPIRE_TIME_CALCULATE,
-            null);
+            null,
+            true);
 
         return new PrimaryRequestState(req, nodes, true);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index f8b3984..614d6cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -101,6 +101,8 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         @Nullable Object[] invokeArgs,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean fastMap,
+        boolean clientReq,
         boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
@@ -117,6 +119,8 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
             retval,
             subjId,
             taskNameHash,
+            fastMap,
+            clientReq,
             needPrimaryRes,
             skipStore,
             keepBinary,
@@ -139,7 +143,8 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer) {
+        @Nullable GridCacheVersion conflictVer,
+        boolean primary) {
         assert conflictTtl < 0 : conflictTtl;
         assert conflictExpireTime < 0 : conflictExpireTime;
         assert conflictVer == null : conflictVer;
@@ -235,13 +240,13 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         }
 
         switch (writer.state()) {
-            case 12:
+            case 13:
                 if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 13:
+            case 14:
                 if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
@@ -263,7 +268,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
             return false;
 
         switch (reader.state()) {
-            case 12:
+            case 13:
                 entryProcessorBytes = reader.readByteArray("entryProcessorBytes");
 
                 if (!reader.isLastRead())
@@ -271,7 +276,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
 
                 reader.incrementState();
 
-            case 13:
+            case 14:
                 invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
@@ -286,7 +291,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 14;
+        return 15;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index b9a1fc6..1db6c74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -92,6 +92,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean fastMap,
+        boolean clientReq,
         boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
@@ -107,6 +109,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
             retval,
             subjId,
             taskNameHash,
+            fastMap,
+            clientReq,
             needPrimaryRes,
             skipStore,
             keepBinary,
@@ -120,18 +124,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         return key.partition();
     }
 
-    /**
-     * @param key Key to add.
-     * @param val Optional update value.
-     * @param conflictTtl Conflict TTL (optional).
-     * @param conflictExpireTime Conflict expire time (optional).
-     * @param conflictVer Conflict version (optional).
-     */
+    /** {@inheritDoc} */
     @Override public void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer) {
+        @Nullable GridCacheVersion conflictVer,
+        boolean primary) {
         assert op != TRANSFORM;
         assert val != null || op == DELETE;
         assert conflictTtl < 0 : conflictTtl;
@@ -257,13 +256,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         }
 
         switch (writer.state()) {
-            case 10:
+            case 11:
                 if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
 
-            case 11:
+            case 12:
                 if (!writer.writeMessage("val", val))
                     return false;
 
@@ -285,7 +284,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
             return false;
 
         switch (reader.state()) {
-            case 10:
+            case 11:
                 key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
@@ -293,7 +292,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
                 reader.incrementState();
 
-            case 11:
+            case 12:
                 val = reader.readMessage("val");
 
                 if (!reader.isLastRead())
@@ -321,7 +320,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 12;
+        return 13;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a44ccf9..5f51770 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -142,6 +142,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         assert conflictPutVals == null || conflictPutVals.size() == keys.size();
         assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
         assert subjId != null;
+        assert !cache.isFastMap(filter, op);
 
         this.keys = keys;
         this.vals = vals;
@@ -552,6 +553,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
 
                 if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                    updVer = null;
+
                     CachePartialUpdateCheckedException cause =
                         X.cause(err, CachePartialUpdateCheckedException.class);
 
@@ -682,7 +685,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
-        map(topVer, remapKeys);
+        doMap(topVer, remapKeys);
     }
 
     /**
@@ -749,15 +752,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer) {
-        map(topVer, null);
+    @Override protected void map(AffinityTopologyVersion topVer, Long ignore) {
+        doMap(topVer, null);
     }
 
     /**
      * @param topVer Topology version.
      * @param remapKeys Keys to remap.
      */
-    void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+    private void doMap(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -1059,12 +1062,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     filter,
                     subjId,
                     taskNameHash,
+                    /*fastMap*/false,
+                    cctx.kernalContext().clientNode(),
                     needPrimaryRes,
                     skipStore,
                     keepBinary,
                     cctx.deploymentEnabled(),
                     keys.size());
 
+                initUpdateVersion(req, topVer);
+
                 mapped = new PrimaryRequestState(req, nodes, false);
 
                 pendingMappings.put(nodeId, mapped);
@@ -1073,7 +1080,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             if (mapped.req.initMappingLocally())
                 mapped.addMapping(nodes);
 
-            mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
+            mapped.req.addUpdateEntry(cacheKey,
+                val,
+                conflictTtl,
+                conflictExpireTime,
+                conflictVer,
+                true);
         }
 
         return pendingMappings;
@@ -1164,17 +1176,22 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             filter,
             subjId,
             taskNameHash,
+            /*fastMap*/false,
+            cctx.kernalContext().clientNode(),
             needPrimaryRes,
             skipStore,
             keepBinary,
             cctx.deploymentEnabled(),
             1);
 
+        initUpdateVersion(req, topVer);
+
         req.addUpdateEntry(cacheKey,
             val,
             conflictTtl,
             conflictExpireTime,
-            conflictVer);
+            conflictVer,
+            true);
 
         return new PrimaryRequestState(req, nodes, true);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 62aecd1..3b86f96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -142,7 +142,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         List<Integer> nearValsIdxs = res.nearValuesIndexes();
         List<Integer> skipped = res.skippedIndexes();
 
-        GridCacheVersion ver = res.nearVersion();
+        GridCacheVersion ver = req.updateVersion();
+
+        if (ver == null)
+            ver = res.nearVersion();
 
         assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index a12b6b9..6da34b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -94,6 +94,13 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
     /**
      * @throws Exception If failed.
      */
+    public void testPartitionedClock() throws Exception {
+        checkMessages(false, CLOCK);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testPartitionedPrimary() throws Exception {
         checkMessages(false, PRIMARY);
     }
@@ -101,6 +108,13 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
     /**
      * @throws Exception If failed.
      */
+    public void testClientClock() throws Exception {
+        checkMessages(true, CLOCK);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientPrimary() throws Exception {
         checkMessages(true, PRIMARY);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 6d90d0e..7927299 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -51,6 +51,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
@@ -834,6 +835,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
         CacheWriteSynchronizationMode writeSync) {
         CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
 
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
         ccfg.setName(TEST_CACHE);
         ccfg.setAtomicityMode(ATOMIC);
         ccfg.setWriteSynchronizationMode(writeSync);


[2/2] ignite git commit: Restore clock mode.

Posted by sb...@apache.org.
Restore clock mode.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0015962a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0015962a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0015962a

Branch: refs/heads/ignite-2.0-clock
Commit: 0015962a43cc48d5b7857503a0f84a43518cbd7f
Parents: 827befb
Author: sboikov <sb...@gridgain.com>
Authored: Wed Mar 22 15:20:14 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Mar 22 16:30:49 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   12 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |   62 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  544 +++++++---
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |    6 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |    6 +-
 ...idNearAtomicAbstractSingleUpdateRequest.java |    4 +
 .../GridNearAtomicAbstractUpdateFuture.java     |   72 +-
 .../GridNearAtomicAbstractUpdateRequest.java    |  120 +-
 .../GridNearAtomicFastMapUpdateFuture.java      | 1026 ++++++++++++++++++
 .../atomic/GridNearAtomicFullUpdateRequest.java |   48 +-
 ...GridNearAtomicSingleUpdateFilterRequest.java |   10 +-
 .../GridNearAtomicSingleUpdateFuture.java       |   23 +-
 ...GridNearAtomicSingleUpdateInvokeRequest.java |   17 +-
 .../GridNearAtomicSingleUpdateRequest.java      |   25 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   29 +-
 .../distributed/near/GridNearAtomicCache.java   |    5 +-
 .../GridCacheAtomicMessageCountSelfTest.java    |   14 +
 .../atomic/IgniteCacheAtomicProtocolTest.java   |    2 +
 18 files changed, 1745 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 54b4ed7..2237e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2170,6 +2170,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                     assert conflictCtx != null;
 
+                    boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
                     // Use old value?
                     if (conflictCtx.isUseOld()) {
                         GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
@@ -2178,7 +2180,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         if (!isNew() &&                                                                       // Not initial value,
                             verCheck &&                                                                       // and atomic version check,
                             oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&                 // and data centers are equal,
-                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, true) == 0 && // and both versions are equal,
+                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
                             cctx.writeThrough() &&                                                            // and store is enabled,
                             primary)                                                                          // and we are primary.
                         {
@@ -2224,11 +2226,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     conflictVer = null;
             }
 
+            boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
             // Perform version check only in case there was no explicit conflict resolution.
             if (conflictCtx == null) {
                 if (verCheck) {
-                    if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) >= 0) {
-                        if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) == 0 && cctx.writeThrough() && primary) {
+                    if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
+                        if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
                             if (log.isDebugEnabled())
                                 log.debug("Received entry update with same version as current (will update store) " +
                                     "[entry=" + this + ", newVer=" + newVer + ']');
@@ -2303,7 +2307,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     }
                 }
                 else
-                    assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) <= 0 :
+                    assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
                         "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..219b04a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -85,6 +85,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /** Update request. */
     final GridNearAtomicAbstractUpdateRequest updateReq;
 
+    /** Update response. */
+    @GridToStringExclude
+    private final GridNearAtomicUpdateResponse updateRes;
+
+    /** Completion callback. */
+    @GridToStringExclude
+    private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
+
     /** Mappings. */
     @GridToStringExclude
     protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
@@ -99,16 +107,22 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param cctx Cache context.
      * @param writeVer Write version.
      * @param updateReq Update request.
+     * @param updateRes Response.
+     * @param completionCb Callback to invoke to send response to near node.
      */
     protected GridDhtAtomicAbstractUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
-    ) {
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb)
+    {
         this.cctx = cctx;
 
-        this.updateReq = updateReq;
         this.writeVer = writeVer;
+        this.updateReq = updateReq;
+        this.updateRes = updateRes;
+        this.completionCb = completionCb;
 
         futId = cctx.mvcc().atomicFutureId();
 
@@ -354,13 +368,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      *
      * @param nearNode Near node.
      * @param ret Cache operation return value.
-     * @param updateRes Response.
-     * @param completionCb Callback to invoke to send response to near node.
      */
-    final void map(ClusterNode nearNode,
-        GridCacheReturn ret,
-        GridNearAtomicUpdateResponse updateRes,
-        GridDhtAtomicCache.UpdateReplyClosure completionCb) {
+    final void map(ClusterNode nearNode, GridCacheReturn ret) {
         if (F.isEmpty(mappings)) {
             updateRes.dhtNodes(Collections.<UUID>emptyList());
 
@@ -371,23 +380,27 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
             return;
         }
 
-        boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
-            !ret.emptyResult() ||
-            updateRes.nearVersion() != null ||
-            cctx.localNodeId().equals(nearNode.id());
+        if (!updateReq.fastMap()) {
+            boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
+                !ret.emptyResult() ||
+                updateRes.nearVersion() != null ||
+                cctx.localNodeId().equals(nearNode.id());
 
-        boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
+            boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
 
-        if (needMapping) {
-            initMapping(updateRes);
+            if (needMapping) {
+                initMapping(updateRes);
 
-            needReplyToNear = true;
-        }
+                needReplyToNear = true;
+            }
 
-        sendDhtRequests(nearNode, ret);
+            sendDhtRequests(nearNode, ret);
 
-        if (needReplyToNear)
-            completionCb.apply(updateReq, updateRes);
+            if (needReplyToNear)
+                completionCb.apply(updateReq, updateRes);
+        }
+        else
+            sendDhtRequests(nearNode, ret);
     }
 
     /**
@@ -416,14 +429,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
             try {
                 assert !cctx.localNodeId().equals(req.nodeId()) : req;
 
-                if (updateReq.fullSync()) {
+                if (!updateReq.fastMap() && updateReq.fullSync()) {
                     req.nearReplyInfo(nearNode.id(), updateReq.futureId());
 
                     if (ret.emptyResult())
                         req.hasResult(true);
                 }
 
-                if (cntQryClsrs != null)
+                if (cntQryClsrs != null || updateReq.fastMap())
                     req.replyWithoutDelay(true);
 
                 cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
@@ -519,6 +532,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     clsr.apply(suc);
             }
 
+            if (updateReq.fastMap())
+                completionCb.apply(updateReq, updateRes);
+
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c20ed48..3735824 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -106,7 +107,9 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -1115,27 +1118,54 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
-        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
-            ctx,
-            this,
-            ctx.config().getWriteSynchronizationMode(),
-            op,
-            map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
-                conflictPutMap.keySet() : conflictRmvMap.keySet(),
-            map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
-            invokeArgs,
-            (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
-            conflictRmvMap != null ? conflictRmvMap.values() : null,
-            retval,
-            rawRetval,
-            opCtx != null ? opCtx.expiry() : null,
-            CU.filterArray(null),
-            subjId,
-            taskNameHash,
-            opCtx != null && opCtx.skipStore(),
-            opCtx != null && opCtx.isKeepBinary(),
-            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-            waitTopFut);
+        final GridNearAtomicAbstractUpdateFuture updateFut;
+
+        if (isFastMap(null, op)) {
+            updateFut = new GridNearAtomicFastMapUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                op,
+                map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
+                    conflictPutMap.keySet() : conflictRmvMap.keySet(),
+                map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
+                invokeArgs,
+                (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
+                conflictRmvMap != null ? conflictRmvMap.values() : null,
+                retval,
+                rawRetval,
+                opCtx != null ? opCtx.expiry() : null,
+                CU.filterArray(null),
+                subjId,
+                taskNameHash,
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                waitTopFut);
+        }
+        else {
+            updateFut = new GridNearAtomicUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                op,
+                map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
+                    conflictPutMap.keySet() : conflictRmvMap.keySet(),
+                map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
+                invokeArgs,
+                (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
+                conflictRmvMap != null ? conflictRmvMap.values() : null,
+                retval,
+                rawRetval,
+                opCtx != null ? opCtx.expiry() : null,
+                CU.filterArray(null),
+                subjId,
+                taskNameHash,
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                waitTopFut);
+        }
 
         if (async) {
             return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1306,7 +1336,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         CacheEntryPredicate[] filters = CU.filterArray(filter);
 
-        if (conflictPutVal == null && conflictRmvVer == null) {
+        if (conflictPutVal == null &&
+            conflictRmvVer == null &&
+            !isFastMap(filters, op)) {
             return new GridNearAtomicSingleUpdateFuture(
                 ctx,
                 this,
@@ -1328,30 +1360,67 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             );
         }
         else {
-            return new GridNearAtomicUpdateFuture(
-                ctx,
-                this,
-                ctx.config().getWriteSynchronizationMode(),
-                op,
-                Collections.singletonList(key),
-                val0 != null ? Collections.singletonList(val0) : null,
-                invokeArgs,
-                conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
-                conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
-                retval,
-                false,
-                opCtx != null ? opCtx.expiry() : null,
-                filters,
-                ctx.subjectIdPerCall(null, opCtx),
-                ctx.kernalContext().job().currentTaskNameHash(),
-                opCtx != null && opCtx.skipStore(),
-                opCtx != null && opCtx.isKeepBinary(),
-                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-                waitTopFut);
+            if (isFastMap(filters, op)) {
+                return new GridNearAtomicFastMapUpdateFuture(
+                    ctx,
+                    this,
+                    ctx.config().getWriteSynchronizationMode(),
+                    op,
+                    Collections.singletonList(key),
+                    val0 != null ? Collections.singletonList(val0) : null,
+                    invokeArgs,
+                    conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
+                    conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
+                    retval,
+                    false,
+                    opCtx != null ? opCtx.expiry() : null,
+                    filters,
+                    ctx.subjectIdPerCall(null, opCtx),
+                    ctx.kernalContext().job().currentTaskNameHash(),
+                    opCtx != null && opCtx.skipStore(),
+                    opCtx != null && opCtx.isKeepBinary(),
+                    opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                    waitTopFut);
+            }
+            else {
+                return new GridNearAtomicUpdateFuture(
+                    ctx,
+                    this,
+                    ctx.config().getWriteSynchronizationMode(),
+                    op,
+                    Collections.singletonList(key),
+                    val0 != null ? Collections.singletonList(val0) : null,
+                    invokeArgs,
+                    conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
+                    conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
+                    retval,
+                    false,
+                    opCtx != null ? opCtx.expiry() : null,
+                    filters,
+                    ctx.subjectIdPerCall(null, opCtx),
+                    ctx.kernalContext().job().currentTaskNameHash(),
+                    opCtx != null && opCtx.skipStore(),
+                    opCtx != null && opCtx.isKeepBinary(),
+                    opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                    waitTopFut);
+            }
         }
     }
 
     /**
+     * Whether this is fast-map operation.
+     *
+     * @param filters Filters.
+     * @param op Operation.
+     * @return {@code True} if fast-map.
+     */
+    public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) {
+        return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+            ctx.config().getAtomicWriteOrderMode() == CLOCK &&
+            !(ctx.writeThrough() && ctx.config().getInterceptor() != null);
+    }
+
+    /**
      * Entry point for all public API remove methods.
      *
      * @param keys Keys to remove.
@@ -1394,26 +1463,52 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             });
         }
 
-        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
-            ctx,
-            this,
-            ctx.config().getWriteSynchronizationMode(),
-            DELETE,
-            keys != null ? keys : conflictMap.keySet(),
-            null,
-            null,
-            null,
-            drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
-            retval,
-            rawRetval,
-            opCtx != null ? opCtx.expiry() : null,
-            CU.filterArray(null),
-            subjId,
-            taskNameHash,
-            opCtx != null && opCtx.skipStore(),
-            opCtx != null && opCtx.isKeepBinary(),
-            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-            true);
+        final GridNearAtomicAbstractUpdateFuture updateFut;
+
+        if (isFastMap(null, DELETE)) {
+            updateFut = new GridNearAtomicFastMapUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                DELETE,
+                keys != null ? keys : conflictMap.keySet(),
+                null,
+                null,
+                null,
+                drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
+                retval,
+                rawRetval,
+                opCtx != null ? opCtx.expiry() : null,
+                CU.filterArray(null),
+                subjId,
+                taskNameHash,
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                true);
+        }
+        else {
+            updateFut = new GridNearAtomicUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                DELETE,
+                keys != null ? keys : conflictMap.keySet(),
+                null,
+                null,
+                null,
+                drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
+                retval,
+                rawRetval,
+                opCtx != null ? opCtx.expiry() : null,
+                CU.filterArray(null),
+                subjId,
+                taskNameHash,
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                true);
+        }
 
         if (async) {
             return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1773,29 +1868,36 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         return;
                     }
 
-                    // Do not check topology version if topology was locked on near node by
+                    // Do not check topology version for CLOCK versioning since
+                    // partition exchange will wait for near update future (if future is on server node).
+                    // Also do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                    if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+                        !needRemap(req.topologyVersion(), top.topologyVersion())) {
                         locked = lockEntries(req, req.topologyVersion());
 
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
-                        // Assign next version for update inside entries lock.
-                        GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+                        GridCacheVersion ver = req.updateVersion();
+
+                        if (ver == null) {
+                            // Assign next version for update inside entries lock.
+                            ver = ctx.versions().next(top.topologyVersion());
 
-                        if (hasNear)
-                            res.nearVersion(ver);
+                            if (hasNear)
+                                res.nearVersion(ver);
 
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("Assigned update version [futId=" + req.futureId() +
-                                ", writeVer=" + ver + ']');
+                            if (msgLog.isDebugEnabled()) {
+                                msgLog.debug("Assigned update version [futId=" + req.futureId() +
+                                    ", writeVer=" + ver + ']');
+                            }
                         }
 
                         assert ver != null : "Got null version for update request: " + req;
 
                         boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
 
-                        dhtFut = createDhtFuture(ver, req);
+                        dhtFut = req.fastMap() ? null : createDhtFuture(ver, req, res, completionCb);
 
                         expiry = expiryPolicy(req.expiry());
 
@@ -1812,6 +1914,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 hasNear,
                                 req,
                                 res,
+                                completionCb,
                                 locked,
                                 ver,
                                 dhtFut,
@@ -1831,6 +1934,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 hasNear,
                                 req,
                                 res,
+                                completionCb,
                                 locked,
                                 ver,
                                 dhtFut,
@@ -1910,9 +2014,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             completionCb.apply(req, res);
         }
-        else
+        else {
             if (dhtFut != null)
-                dhtFut.map(node, res.returnValue(), res, completionCb);
+                dhtFut.map(node, res.returnValue());
+            else {
+                assert req.fastMap() : req;
+
+                completionCb.apply(req, res);
+            }
+        }
 
         if (req.writeSynchronizationMode() != FULL_ASYNC)
             req.cleanup(!node.isLocal());
@@ -1943,6 +2053,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final boolean hasNear,
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         final List<GridDhtCacheEntry> locked,
         final GridCacheVersion ver,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
@@ -1990,6 +2101,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         for (int i = 0; i < locked.size(); i++) {
             GridDhtCacheEntry entry = locked.get(i);
 
+            if (entry == null)
+                continue;
+
             try {
                 if (!checkFilter(entry, req, res)) {
                     if (expiry != null && entry.hasValue()) {
@@ -2095,6 +2209,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 dhtFut,
                                 req,
                                 res,
+                                completionCb,
                                 replicate,
                                 updRes,
                                 taskName,
@@ -2143,6 +2258,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 dhtFut,
                                 req,
                                 res,
+                                completionCb,
                                 replicate,
                                 updRes,
                                 taskName,
@@ -2269,6 +2385,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 dhtFut,
                 req,
                 res,
+                completionCb,
                 replicate,
                 updRes,
                 taskName,
@@ -2358,6 +2475,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean hasNear,
         GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
@@ -2373,6 +2491,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
 
+        boolean readersOnly = false;
+
         boolean intercept = ctx.config().getInterceptor() != null;
 
         AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
@@ -2388,12 +2508,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             try {
                 GridDhtCacheEntry entry = locked.get(i);
 
+                if (entry == null)
+                    continue;
+
                 GridCacheVersion newConflictVer = req.conflictVersion(i);
                 long newConflictTtl = req.conflictTtl(i);
                 long newConflictExpireTime = req.conflictExpireTime(i);
 
                 assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
 
+                boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(),
+                    req.topologyVersion());
+
                 Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
 
                 Collection<UUID> readers = null;
@@ -2411,18 +2537,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     op,
                     writeVal,
                     req.invokeArguments(),
-                    writeThrough() && !req.skipStore(),
+                    (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
+                        && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
                     sndPrevVal || req.returnValue(),
                     req.keepBinary(),
                     expiry,
                     /*event*/true,
                     /*metrics*/true,
-                    /*primary*/true,
-                    /*verCheck*/false,
+                    primary,
+                    ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
                     topVer,
                     req.filter(),
-                    replicate ? DR_PRIMARY : DR_NONE,
+                    replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
                     newConflictTtl,
                     newConflictExpireTime,
                     newConflictVer,
@@ -2434,6 +2561,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     /*updateCntr*/null,
                     dhtFut);
 
+                if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+                    dhtFut = createDhtFuture(ver, req, res, completionCb);
+
+                    readersOnly = true;
+                }
+
                 if (dhtFut != null) {
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2445,17 +2578,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         EntryProcessor<Object, Object, Object> entryProcessor = null;
 
-                        dhtFut.addWriteEntry(
-                            affAssignment,
-                            entry,
-                            updRes.newValue(),
-                            entryProcessor,
-                            updRes.newTtl(),
-                            updRes.conflictExpireTime(),
-                            newConflictVer,
-                            sndPrevVal,
-                            updRes.oldValue(),
-                            updRes.updateCounter());
+                        if (!readersOnly) {
+                            dhtFut.addWriteEntry(
+                                affAssignment,
+                                entry,
+                                updRes.newValue(),
+                                entryProcessor,
+                                updRes.newTtl(),
+                                updRes.conflictExpireTime(),
+                                newConflictVer,
+                                sndPrevVal,
+                                updRes.oldValue(),
+                                updRes.updateCounter());
+                        }
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(
@@ -2474,7 +2609,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (hasNear) {
-                    if (updRes.sendToDht()) {
+                    if (primary && updRes.sendToDht()) {
                         if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
                             // If put the same value as in request then do not need to send it back.
                             if (op == TRANSFORM || writeVal != updRes.newValue()) {
@@ -2580,6 +2715,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         final boolean replicate,
         final UpdateBatchResult batchRes,
         final String taskName,
@@ -2600,8 +2736,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             GridCacheOperation op;
 
             if (putMap != null) {
+                // If fast mapping, filter primary keys for write to store.
+                Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
+                    F.view(putMap, new P1<CacheObject>() {
+                        @Override public boolean apply(CacheObject key) {
+                            return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
+                        }
+                    }) :
+                    putMap;
+
                 try {
-                    ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
+                    ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
                         @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
                             return F.t(v, ver);
                         }
@@ -2614,8 +2759,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 op = UPDATE;
             }
             else {
+                // If fast mapping, filter primary keys for write to store.
+                Collection<KeyCacheObject> storeKeys = req.fastMap() ?
+                    F.view(rmvKeys, new P1<Object>() {
+                        @Override public boolean apply(Object key) {
+                            return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
+                        }
+                    }) :
+                    rmvKeys;
+
                 try {
-                    ctx.store().removeAll(null, rmvKeys);
+                    ctx.store().removeAll(null, storeKeys);
                 }
                 catch (CacheStorePartialUpdateException e) {
                     storeErr = e;
@@ -2650,6 +2804,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     assert writeVal != null || op == DELETE : "null write value found.";
 
+                    boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(),
+                        entry.partition(),
+                        req.topologyVersion());
+
                     Collection<UUID> readers = null;
                     Collection<UUID> filteredReaders = null;
 
@@ -2672,11 +2830,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         expiry,
                         /*event*/true,
                         /*metrics*/true,
-                        /*primary*/true,
-                        /*verCheck*/false,
+                        primary,
+                        ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
                         topVer,
                         null,
-                        replicate ? DR_PRIMARY : DR_NONE,
+                        replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
                         CU.TTL_NOT_CHANGED,
                         CU.EXPIRE_TIME_CALCULATE,
                         null,
@@ -2710,23 +2868,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     batchRes.addDeleted(entry, updRes, entries);
 
+                    if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+                        dhtFut = createDhtFuture(ver, req, res, completionCb);
+
+                        batchRes.readersOnly(true);
+                    }
+
                     if (dhtFut != null) {
                         EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
-                        dhtFut.addWriteEntry(
-                            affAssignment,
-                            entry,
-                            writeVal,
-                            entryProcessor,
-                            updRes.newTtl(),
-                            CU.EXPIRE_TIME_CALCULATE,
-                            null,
-                            sndPrevVal,
-                            updRes.oldValue(),
-                            updRes.updateCounter());
+                        if (!batchRes.readersOnly()) {
+                            dhtFut.addWriteEntry(
+                                affAssignment,
+                                entry,
+                                writeVal,
+                                entryProcessor,
+                                updRes.newTtl(),
+                                CU.EXPIRE_TIME_CALCULATE,
+                                null,
+                                sndPrevVal,
+                                updRes.oldValue(),
+                                updRes.updateCounter());
+                        }
 
-                        if (!F.isEmpty(filteredReaders))
+                        if (!F.isEmpty(filteredReaders)) {
                             dhtFut.addNearWriteEntries(
                                 filteredReaders,
                                 entry,
@@ -2734,29 +2900,34 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 entryProcessor,
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
+                        }
                     }
 
                     if (hasNear) {
-                        if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
-                            int idx = firstEntryIdx + i;
-
-                            if (req.operation() == TRANSFORM) {
-                                res.addNearValue(idx,
-                                    writeVal,
-                                    updRes.newTtl(),
-                                    CU.EXPIRE_TIME_CALCULATE);
-                            }
-                            else
-                                res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+                        if (primary) {
+                            if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
+                                int idx = firstEntryIdx + i;
+
+                                if (req.operation() == TRANSFORM) {
+                                    res.addNearValue(idx,
+                                        writeVal,
+                                        updRes.newTtl(),
+                                        CU.EXPIRE_TIME_CALCULATE);
+                                }
+                                else
+                                    res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
 
-                            if (writeVal != null || entry.hasValue()) {
-                                IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+                                if (writeVal != null || entry.hasValue()) {
+                                    IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
 
-                                assert f == null : f;
+                                    assert f == null : f;
+                                }
                             }
+                            else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+                                entry.removeReader(nearNode.id(), req.messageId());
+                            else
+                                res.addSkippedIndex(firstEntryIdx + i);
                         }
-                        else if (readers.contains(nearNode.id())) // Reader became primary or backup.
-                            entry.removeReader(nearNode.id(), req.messageId());
                         else
                             res.addSkippedIndex(firstEntryIdx + i);
                     }
@@ -2800,14 +2971,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             KeyCacheObject key = req.key(0);
 
             while (true) {
-                GridDhtCacheEntry entry = entryExx(key, topVer);
+                try {
+                    GridDhtCacheEntry entry = entryExx(key, topVer);
 
-                GridUnsafe.monitorEnter(entry);
+                    GridUnsafe.monitorEnter(entry);
 
-                if (entry.obsolete())
-                    GridUnsafe.monitorExit(entry);
-                else
-                    return Collections.singletonList(entry);
+                    if (entry.obsolete())
+                        GridUnsafe.monitorExit(entry);
+                    else
+                        return Collections.singletonList(entry);
+                }
+                catch (GridDhtInvalidPartitionException e) {
+                    // Ignore invalid partition exception in CLOCK ordering mode.
+                    if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+                        return Collections.singletonList(null);
+                    else
+                        throw e;
+                }
             }
         }
         else {
@@ -2815,9 +2995,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             while (true) {
                 for (int i = 0; i < req.size(); i++) {
-                    GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+                    try {
+                        GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
 
-                    locked.add(entry);
+                        locked.add(entry);
+                    }
+                    catch (GridDhtInvalidPartitionException e) {
+                        // Ignore invalid partition exception in CLOCK ordering mode.
+                        if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+                            locked.add(null);
+                        else
+                            throw e;
+                    }
                 }
 
                 boolean retry = false;
@@ -2970,28 +3159,54 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             drPutVals = null;
         }
 
-        GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
-            ctx,
-            this,
-            ctx.config().getWriteSynchronizationMode(),
-            req.operation(),
-            req.keys(),
-            vals,
-            req.invokeArguments(),
-            drPutVals,
-            drRmvVals,
-            req.returnValue(),
-            false,
-            req.expiry(),
-            req.filter(),
-            req.subjectId(),
-            req.taskNameHash(),
-            req.skipStore(),
-            req.keepBinary(),
-            MAX_RETRIES,
-            true);
+        if (isFastMap(req.filter(), req.operation())) {
+            GridNearAtomicFastMapUpdateFuture updateFut = new GridNearAtomicFastMapUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                req.operation(),
+                req.keys(),
+                vals,
+                req.invokeArguments(),
+                drPutVals,
+                drRmvVals,
+                req.returnValue(),
+                false,
+                req.expiry(),
+                req.filter(),
+                req.subjectId(),
+                req.taskNameHash(),
+                req.skipStore(),
+                req.keepBinary(),
+                MAX_RETRIES,
+                true);
 
-        updateFut.map();
+            updateFut.map();
+        }
+        else {
+            GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                req.operation(),
+                req.keys(),
+                vals,
+                req.invokeArguments(),
+                drPutVals,
+                drRmvVals,
+                req.returnValue(),
+                false,
+                req.expiry(),
+                req.filter(),
+                req.subjectId(),
+                req.taskNameHash(),
+                req.skipStore(),
+                req.keepBinary(),
+                MAX_RETRIES,
+                true);
+
+            updateFut.map();
+        }
     }
 
     /**
@@ -3003,12 +3218,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
         if (updateReq.size() == 1)
-            return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
+            return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
         else
-            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
     }
 
     /**
@@ -3520,6 +3737,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private GridDhtAtomicAbstractUpdateFuture dhtFut;
 
         /** */
+        private boolean readersOnly;
+
+        /** */
         private GridCacheReturn invokeRes;
 
         /**
@@ -3572,6 +3792,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
             this.dhtFut = dhtFut;
         }
+
+        /**
+         * @return {@code True} if only readers (not backups) should be updated.
+         */
+        private boolean readersOnly() {
+            return readersOnly;
+        }
+
+        /**
+         * @param readersOnly {@code True} if only readers (not backups) should be updated.
+         */
+        private void readersOnly(boolean readersOnly) {
+            this.readersOnly = readersOnly;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 8ebe9c3..8ad6496 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -50,9 +50,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     GridDhtAtomicSingleUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
-        super(cctx, writeVer, updateReq);
+        super(cctx, writeVer, updateReq, updateRes, completionCb);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5d5ddf0..6de08c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -49,9 +49,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
-        super(cctx, writeVer, updateReq);
+        super(cctx, writeVer, updateReq, updateRes, completionCb);
 
         mappings = U.newHashMap(updateReq.size());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 6811236..10f368e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -73,6 +73,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean fastMap,
+        boolean clientReq,
         boolean mappingKnown,
         boolean skipStore,
         boolean keepBinary,
@@ -88,6 +90,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
             retval,
             subjId,
             taskNameHash,
+            fastMap,
+            clientReq,
             mappingKnown,
             skipStore,
             keepBinary,

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..59f3e76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
@@ -140,6 +142,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     @GridToStringInclude
     protected Long futId;
 
+    /** */
+    protected GridCacheVersion updVer;
+
     /** Operation result. */
     protected GridCacheReturn opRes;
 
@@ -208,12 +213,32 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     }
 
     /** {@inheritDoc} */
-    @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
         return null;
     }
 
     /**
      * @param req Request.
+     * @param topVer Topology version.
+     */
+    final void initUpdateVersion(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) {
+        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+        if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+            GridCacheVersion updVer = this.updVer;
+
+            if (updVer == null) {
+                this.updVer = updVer = cctx.versions().next(topVer);
+
+                if (log.isDebugEnabled())
+                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+            }
+
+            req.updateVersion(updVer);
+        }
+    }
+
+    /**
+     * @param req Request.
      */
     void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
         try {
@@ -241,14 +266,29 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             // Cannot remap.
             remapCnt = 1;
 
-            map(topVer);
+            if (fastMap()) {
+                Long futId = addAtomicFuture(topVer);
+
+                if (futId != null)
+                    map(topVer, futId);
+            }
+            else
+                map(topVer, futId);
         }
     }
 
     /**
+     * @return {@code True} for fast map update mode.
+     */
+    protected boolean fastMap() {
+        return false;
+    }
+
+    /**
      * @param topVer Topology version.
+     * @param futId Future ID.
      */
-    protected abstract void map(AffinityTopologyVersion topVer);
+    protected abstract void map(AffinityTopologyVersion topVer, Long futId);
 
     /**
      * Maps future on ready topology.
@@ -274,7 +314,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
      */
     final boolean storeFuture() {
-        return syncMode != FULL_ASYNC;
+        return syncMode != FULL_ASYNC || cctx.config().getAtomicWriteOrderMode() == CLOCK;
     }
 
     /**
@@ -416,6 +456,30 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     }
 
     /**
+     * Adds future prevents topology change before operation complete.
+     * Should be invoked before topology lock released.
+     *
+     * @param topVer Topology version.
+     * @return Future version in case future added.
+     */
+    protected final Long addAtomicFuture(AffinityTopologyVersion topVer) {
+        Long futId = cctx.mvcc().atomicFutureId();
+
+        synchronized (mux) {
+            assert this.futId == null : this;
+            assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+            this.topVer = topVer;
+            this.futId = futId;
+        }
+
+        if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this))
+            return null;
+
+        return futId;
+    }
+
+    /**
      *
      */
     static class PrimaryRequestState {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a43bfb0..bdf2678 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -48,7 +48,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
-    /** . */
+    /** */
     private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
 
     /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
@@ -63,6 +63,15 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     /** Return value flag. */
     private static final int RET_VAL_FLAG_MASK = 0x10;
 
+    /** Fast map update flag. */
+    private static final int FAST_MAP_FLAG_MASK = 0x20;
+
+    /** Client node request flag. */
+    private static final int CLIENT_REQ_FLAG_MASK = 0x40;
+
+    /** Client node request flag. */
+    private static final int HAS_PRIMARY_FLAG_MASK = 0x80;
+
     /** Target node ID. */
     @GridDirectTransient
     protected UUID nodeId;
@@ -93,6 +102,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     @GridDirectTransient
     private GridNearAtomicUpdateResponse res;
 
+    /** Update version. Set to non-null if fastMap is {@code true}. */
+    private GridCacheVersion updateVer;
+
     /**
      *
      */
@@ -129,6 +141,8 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean fastMap,
+        boolean clientReq,
         boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
@@ -144,16 +158,62 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         this.taskNameHash = taskNameHash;
         this.addDepInfo = addDepInfo;
 
+        if (fastMap)
+            setFlag(true, FAST_MAP_FLAG_MASK);
+        if (clientReq)
+            setFlag(true, CLIENT_REQ_FLAG_MASK);
         if (needPrimaryRes)
-            needPrimaryResponse(true);
+            setFlag(true, NEED_PRIMARY_RES_FLAG_MASK);
         if (topLocked)
-            topologyLocked(true);
+            setFlag(true, TOP_LOCKED_FLAG_MASK);
         if (retval)
-            returnValue(true);
+            setFlag(true, RET_VAL_FLAG_MASK);
         if (skipStore)
-            skipStore(true);
+            setFlag(true, SKIP_STORE_FLAG_MASK);
         if (keepBinary)
-            keepBinary(true);
+            setFlag(true, KEEP_BINARY_FLAG_MASK);
+    }
+
+    /**
+     * @return Flag indicating whether this request contains primary keys.
+     */
+    boolean hasPrimary() {
+        return isFlag(HAS_PRIMARY_FLAG_MASK);
+    }
+
+    /**
+     * @param val Flag indicating whether this request contains primary keys.
+     */
+    void hasPrimary(boolean val) {
+        setFlag(val, HAS_PRIMARY_FLAG_MASK);
+    }
+
+    /**
+     * @return Flag indicating whether this is fast-map update.
+     */
+    boolean fastMap() {
+        return isFlag(FAST_MAP_FLAG_MASK);
+    }
+
+    /**
+     * @return {@code True} if request sent from client node.
+     */
+    boolean clientRequest() {
+        return isFlag(CLIENT_REQ_FLAG_MASK);
+    }
+
+    /**
+     * @return Update version for fast-map request.
+     */
+    @Nullable public final GridCacheVersion updateVersion() {
+        return updateVer;
+    }
+
+    /**
+     * @param updateVer Update version for fast-map request.
+     */
+    final void updateVersion(GridCacheVersion updateVer) {
+        this.updateVer = updateVer;
     }
 
     /** {@inheritDoc} */
@@ -291,13 +351,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param val {@code True} if topology is locked on near node.
-     */
-    private void topologyLocked(boolean val) {
-        setFlag(val, TOP_LOCKED_FLAG_MASK);
-    }
-
-    /**
      * @return Return value flag.
      */
     public final boolean returnValue() {
@@ -305,13 +358,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param val Return value flag.
-     */
-    public final void returnValue(boolean val) {
-        setFlag(val, RET_VAL_FLAG_MASK);
-    }
-
-    /**
      * @return Skip write-through to a persistent storage.
      */
     public final boolean skipStore() {
@@ -319,13 +365,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param val Skip store flag.
-     */
-    public void skipStore(boolean val) {
-        setFlag(val, SKIP_STORE_FLAG_MASK);
-    }
-
-    /**
      * @return Keep binary flag.
      */
     public final boolean keepBinary() {
@@ -333,13 +372,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param val Keep binary flag.
-     */
-    public void keepBinary(boolean val) {
-        setFlag(val, KEEP_BINARY_FLAG_MASK);
-    }
-
-    /**
      * Sets flag mask.
      *
      * @param flag Set or clear.
@@ -380,12 +412,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @param conflictTtl Conflict TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
+     * @param primary If given key is primary on this mapping.
      */
     abstract void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer);
+        @Nullable GridCacheVersion conflictVer,
+        boolean primary);
 
     /**
      * @return Keys for this update request.
@@ -458,7 +492,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 11;
     }
 
     /** {@inheritDoc} */
@@ -518,6 +552,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
 
                 writer.incrementState();
 
+            case 10:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -598,6 +638,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
 
                 reader.incrementState();
 
+            case 10:
+                updateVer = reader.readMessage("updateVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class);