You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:47 UTC

[02/10] ignite git commit: ignite-4680-2

ignite-4680-2


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1b4ebd4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1b4ebd4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1b4ebd4

Branch: refs/heads/ignite-4680-sb
Commit: d1b4ebd48290af5fb3a2d0d9df57219dd4ce1edb
Parents: 8e5e3cb
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Wed Mar 15 19:41:49 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Wed Mar 15 19:41:49 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   6 +-
 .../ignite/internal/IgniteNodeAttributes.java   |   3 +
 .../managers/communication/GridIoManager.java   |  23 +-
 .../cache/GridCacheAffinityManager.java         |  21 +-
 .../processors/cache/GridCacheAtomicFuture.java |   2 +-
 .../cache/GridCacheEvictionManager.java         |   2 +-
 .../processors/cache/GridCacheIoManager.java    |   6 +-
 .../cache/GridCacheManagerAdapter.java          |   2 +-
 .../processors/cache/GridCacheMvccManager.java  |  14 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 184 ++++++++---
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   5 +-
 .../GridNearAtomicAbstractUpdateFuture.java     | 102 +++++-
 .../GridNearAtomicAbstractUpdateRequest.java    |  46 +--
 .../atomic/GridNearAtomicFullUpdateRequest.java | 127 ++++++-
 .../GridNearAtomicSingleUpdateFuture.java       |  25 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  72 ++--
 .../atomic/GridNearAtomicUpdateResponse.java    |  40 ++-
 .../distributed/near/GridNearAtomicCache.java   |  27 +-
 .../cache/version/GridCacheVersionManager.java  |  47 +++
 .../apache/ignite/internal/util/MPSCQueue.java  | 331 +++++++++++++++++++
 .../ignite/internal/util/StripedExecutor.java   | 125 ++++++-
 .../ignite/thread/IgniteStripeThread.java       |  47 +++
 .../GridNearAtomicFullUpdateRequestTest.java    |  74 +++++
 .../dht/atomic/MarshallingAbstractTest.java     | 156 +++++++++
 .../version/GridCacheVersionManagerTest.java    |  86 +++++
 26 files changed, 1380 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2a6706e..d37e4fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -227,6 +227,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_RESTART_ENABLED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_PORT_RANGE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SPI_CLASS;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPES_CNT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME;
 import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
 import static org.apache.ignite.internal.IgniteVersionUtils.BUILD_TSTAMP_STR;
@@ -1167,7 +1168,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                                 pubPoolIdleThreads + ", qSize=" + pubPoolQSize + "]" + NL +
                                 "    ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" +
                                 sysPoolIdleThreads + ", qSize=" + sysPoolQSize + "]" + NL +
-                                "    ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]";
+                                "    ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]" + NL +
+                                "    ^-- Striped executor [" + ctx.getStripedExecutorService().getMetrics() + "]";
 
                             log.info(msg);
                         }
@@ -1440,6 +1442,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
         add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK));
 
+        add(ATTR_STRIPES_CNT, ctx.getStripedExecutorService() != null ? ctx.getStripedExecutorService().stripes() : -1);
+
         if (cfg.getConsistentId() != null)
             add(ATTR_NODE_CONSISTENT_ID, cfg.getConsistentId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 8294026..f1a2558 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -168,6 +168,9 @@ public final class IgniteNodeAttributes {
     /** Ignite services compatibility mode (can be {@code null}). */
     public static final String ATTR_SERVICES_COMPATIBILITY_MODE = ATTR_PREFIX + ".services.compatibility.enabled";
 
+    /** Ignite cache stripe size. */
+    public static final String ATTR_STRIPES_CNT = ATTR_PREFIX + ".cache.stripes.cnt";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 23738d7..5fd2845 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -54,11 +54,13 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -97,9 +99,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGF
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
 import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -807,19 +809,32 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
         };
 
+        StripedExecutor stripedExecutor = ctx.getStripedExecutorService();
+
         if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
             IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
 
             if (msg0.processFromNioThread())
                 c.run();
             else
-                ctx.getStripedExecutorService().execute(-1, c);
+                stripedExecutor.execute(-1, c);
 
             return;
         }
 
-        if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != Integer.MIN_VALUE) {
-            ctx.getStripedExecutorService().execute(msg.partition(), c);
+        if (plc == GridIoPolicy.SYSTEM_POOL &&
+            (msg.partition() != Integer.MIN_VALUE ||
+                msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
+            Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
+                ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
+
+            if (stripemap != null) {
+                for (Integer stripe : stripemap.keySet()) {
+                    stripedExecutor.execute(stripe, c);
+                }
+            }
+            else
+                stripedExecutor.execute(msg.partition(), c);
 
             return;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 17c9319..621634c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryObject;
@@ -33,12 +38,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
 /**
  * Cache affinity manager.
  */
@@ -88,10 +87,15 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      *
      */
     public void cancelFutures() {
+        if (!starting.get())
+            // Ignoring attempt to stop manager that has never been started.
+            return;
+
         IgniteCheckedException err =
             new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
 
-        aff.cancelFutures(err);
+        if (aff != null)
+            aff.cancelFutures(err);
     }
 
     /** {@inheritDoc} */
@@ -99,7 +103,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
         IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
             "Failed to wait for topology update, client disconnected.");
 
-        aff.cancelFutures(err);
+        if (aff != null)
+            aff.cancelFutures(err);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 8df229e..87ae29c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -27,7 +27,7 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
      * @return Future ID.
      */
-    public Long id();
+    public long id();
 
     /**
      * Gets future that will be completed when it is safe when update is finished on the given version of topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 06c707e..e76b773 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1966,7 +1966,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
             if (lock.readLock().tryLock()) {
                 try {
                     if (log.isDebugEnabled())
-                        log.debug("Entered to eviction future onResponse() [fut=" + this + ", node=" + nodeId +
+                        log.debug("Entered to eviction future storeResponse() [fut=" + this + ", node=" + nodeId +
                             ", res=" + res + ']');
 
                     ClusterNode node = cctx.node(nodeId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 99878ec..3cf68d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -431,7 +431,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 append(", dhtTxId=").append(dhtTxId(cacheMsg)).
                 append(", msg=").append(cacheMsg);
         }
-        else if (atomicFututeId(cacheMsg) != null) {
+        else if (atomicFututeId(cacheMsg) > -1) {
             builder.append("futId=").append(atomicFututeId(cacheMsg)).
                 append(", writeVer=").append(atomicWriteVersion(cacheMsg)).
                 append(", msg=").append(cacheMsg);
@@ -484,7 +484,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param cacheMsg Cache message.
      * @return Atomic future ID if applicable for message.
      */
-    @Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) {
+    @Nullable private long atomicFututeId(GridCacheMessage cacheMsg) {
         if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
             return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
         else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
@@ -496,7 +496,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         else if (cacheMsg instanceof GridNearAtomicCheckUpdateRequest)
             return ((GridNearAtomicCheckUpdateRequest)cacheMsg).futureId();
 
-        return null;
+        return -1;
     }
 
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
index 8ad0ea8..ab965de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
@@ -34,7 +34,7 @@ public class GridCacheManagerAdapter<K, V> implements GridCacheManager<K, V> {
     protected IgniteLogger log;
 
     /** Starting flag. */
-    private final AtomicBoolean starting = new AtomicBoolean(false);
+    protected final AtomicBoolean starting = new AtomicBoolean(false);
 
     /** {@inheritDoc} */
     @Override public final void start(GridCacheContext<K, V> cctx) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dff2c88..c4cbefd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -132,7 +132,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     };
 
     /** Logger. */
-    @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private IgniteLogger exchLog;
 
     /** */
@@ -256,9 +256,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 cacheFut.onNodeLeft(discoEvt.eventNode().id());
 
                 if (cacheFut.isCancelled() || cacheFut.isDone()) {
-                    Long futId = cacheFut.id();
+                    long futId = cacheFut.id();
 
-                    if (futId != null)
+                    if (futId > -1)
                         atomicFuts.remove(futId, cacheFut);
                 }
             }
@@ -437,7 +437,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param fut Future.
      * @return {@code False} if future was forcibly completed with error.
      */
-    public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+    public boolean addAtomicFuture(long futId, GridCacheAtomicFuture<?> fut) {
         IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
 
         assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
@@ -472,7 +472,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param futId Future ID.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+    @Nullable public IgniteInternalFuture<?> atomicFuture(long futId) {
         return atomicFuts.get(futId);
     }
 
@@ -480,7 +480,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param futId Future ID.
      * @return Removed future.
      */
-    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(long futId) {
         return atomicFuts.remove(futId);
     }
 
@@ -511,8 +511,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-
-    /**
      * Adds future.
      *
      * @param fut Future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..17ee298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -295,7 +295,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /** {@inheritDoc} */
-    @Override public final Long id() {
+    public final long id() {
         return futId;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c20ed48..4ab5000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -101,6 +101,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.thread.IgniteStripeThread;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
@@ -278,9 +279,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     UUID nodeId,
                     GridNearAtomicAbstractUpdateRequest req
                 ) {
+                    int stripeIdx;
+
+                    if (req instanceof GridNearAtomicFullUpdateRequest && Thread.currentThread() instanceof IgniteStripeThread)
+                        stripeIdx = ((IgniteStripeThread)Thread.currentThread()).stripeIndex();
+                    else
+                        stripeIdx = IgniteStripeThread.GRP_IDX_UNASSIGNED;
+
                     processNearAtomicUpdateRequest(
                         nodeId,
-                        req);
+                        req,
+                        stripeIdx);
                 }
 
                 @Override public String toString() {
@@ -1640,6 +1649,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * Executes local update after preloader fetched values.
+     *
+     * @param nodeId Node ID.
+     * @param req Update request.
+     * @param completionCb Completion callback.
+     */
+    public void updateAllAsyncInternal(
+        final UUID nodeId,
+        final GridNearAtomicAbstractUpdateRequest req,
+        final UpdateReplyClosure completionCb
+    ) {
+        updateAllAsyncInternal(nodeId, req, IgniteStripeThread.GRP_IDX_UNASSIGNED, completionCb);
+    }
+
+    /**
      * Executes local update.
      *
      * @param nodeId Node ID.
@@ -1649,6 +1673,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     void updateAllAsyncInternal(
         final UUID nodeId,
         final GridNearAtomicAbstractUpdateRequest req,
+        final int stripeIdx,
         final UpdateReplyClosure completionCb
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
@@ -1667,7 +1692,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 return;
             }
 
-            updateAllAsyncInternal0(nodeId, req, completionCb);
+            updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
         }
         else {
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@@ -1684,7 +1709,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         return;
                     }
 
-                    updateAllAsyncInternal0(nodeId, req, completionCb);
+                    updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
                 }
             });
         }
@@ -1718,11 +1743,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *
      * @param nodeId Node ID.
      * @param req Update request.
+     * @param stripeIdx Stripe index.
      * @param completionCb Completion callback.
      */
     private void updateAllAsyncInternal0(
         UUID nodeId,
         GridNearAtomicAbstractUpdateRequest req,
+        int stripeIdx,
         UpdateReplyClosure completionCb
     ) {
         ClusterNode node = ctx.discovery().node(nodeId);
@@ -1741,6 +1768,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             ctx.deploymentEnabled());
 
+        res.partition(req.partition());
+
+        int[] stripeIdxs = null;
+
+        if (stripeIdx != IgniteStripeThread.GRP_IDX_UNASSIGNED
+            && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
+            && ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
+            stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);
+
+            res.stripe(stripeIdx);
+        }
+
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
 
         GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -1765,7 +1804,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 try {
                     if (top.stopping()) {
-                        res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " +
+                        addAllKeysAsFailed(req, res, stripeIdxs, new IgniteCheckedException("Failed to perform cache operation " +
                             "(cache is stopped): " + name()));
 
                         completionCb.apply(req, res);
@@ -1776,7 +1815,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
                     if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
-                        locked = lockEntries(req, req.topologyVersion());
+                        locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
 
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
@@ -1795,13 +1834,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
 
-                        dhtFut = createDhtFuture(ver, req);
+                        int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+                        dhtFut = createDhtFuture(ver, req, size);
 
                         expiry = expiryPolicy(req.expiry());
 
                         GridCacheReturn retVal = null;
 
-                        if (req.size() > 1 &&                    // Several keys ...
+                        if (size > 1 &&                           // Several keys ...
                             writeThrough() && !req.skipStore() && // and store is enabled ...
                             !ctx.store().isLocal() &&             // and this is not local store ...
                                                                   // (conflict resolver should be used for local store)
@@ -1818,7 +1859,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 ctx.isDrEnabled(),
                                 taskName,
                                 expiry,
-                                sndPrevVal);
+                                sndPrevVal,
+                                stripeIdxs);
 
                             deleted = updRes.deleted();
                             dhtFut = updRes.dhtFuture();
@@ -1837,7 +1879,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 ctx.isDrEnabled(),
                                 taskName,
                                 expiry,
-                                sndPrevVal);
+                                sndPrevVal,
+                                stripeIdxs);
 
                             retVal = updRes.returnValue();
                             deleted = updRes.deleted();
@@ -1895,7 +1938,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             // an attempt to use cleaned resources.
             U.error(log, "Unexpected exception during cache update", e);
 
-            res.addFailedKeys(req.keys(), e);
+            addAllKeysAsFailed(req, res, stripeIdxs, e);
 
             completionCb.apply(req, res);
 
@@ -1921,6 +1964,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * Adds all keys as failed to response.
+     *
+     * @param req Request.
+     * @param res Response.
+     * @param stripeIdx Stripe indexes.
+     * @param e Throwable.
+     */
+    private void addAllKeysAsFailed(GridNearAtomicAbstractUpdateRequest req,
+        GridNearAtomicUpdateResponse res,
+        int[] stripeIdx,
+        Throwable e) {
+
+        if (stripeIdx == null)
+            res.addFailedKeys(req.keys(), e);
+        else {
+            for (int i = 0; i < stripeIdx.length; i++)
+                res.addFailedKey(req.key(stripeIdx[i]), e);
+        }
+    }
+
+    /**
      * Updates locked entries using batched write-through.
      *
      * @param node Sender node.
@@ -1934,6 +1998,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param taskName Task name.
      * @param expiry Expiry policy.
      * @param sndPrevVal If {@code true} sends previous value to backups.
+     * @param stripeIdxs Stripe indexes.
      * @return Deleted entries.
      * @throws GridCacheEntryRemovedException Should not be thrown.
      */
@@ -1949,7 +2014,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final boolean replicate,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
-        final boolean sndPrevVal
+        final boolean sndPrevVal,
+        final int[] stripeIdxs
     ) throws GridCacheEntryRemovedException {
         assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
         assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1959,13 +2025,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 reloadIfNeeded(locked);
             }
             catch (IgniteCheckedException e) {
-                res.addFailedKeys(req.keys(), e);
+                addAllKeysAsFailed(req, res, stripeIdxs, e);
 
                 return new UpdateBatchResult();
             }
         }
 
-        int size = req.size();
+        int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
 
         Map<KeyCacheObject, CacheObject> putMap = null;
 
@@ -1990,6 +2056,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         for (int i = 0; i < locked.size(); i++) {
             GridDhtCacheEntry entry = locked.get(i);
 
+            int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
             try {
                 if (!checkFilter(entry, req, res)) {
                     if (expiry != null && entry.hasValue()) {
@@ -2017,7 +2085,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (op == TRANSFORM) {
-                    EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+                    EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(trueIdx);
 
                     CacheObject old = entry.innerGet(
                         ver,
@@ -2099,7 +2167,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 updRes,
                                 taskName,
                                 expiry,
-                                sndPrevVal);
+                                sndPrevVal,
+                                stripeIdxs);
 
                             firstEntryIdx = i;
 
@@ -2147,7 +2216,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 updRes,
                                 taskName,
                                 expiry,
-                                sndPrevVal);
+                                sndPrevVal,
+                                stripeIdxs);
 
                             firstEntryIdx = i;
 
@@ -2172,12 +2242,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     entryProcessorMap.put(entry.key(), entryProcessor);
                 }
                 else if (op == UPDATE) {
-                    CacheObject updated = req.value(i);
+                    CacheObject updated = req.value(trueIdx);
 
                     if (intercept) {
                         CacheObject old = entry.innerGet(
-                             null,
-                             null,
+                            null,
+                            null,
                             /*read swap*/true,
                             /*read through*/ctx.loadPreviousValue(),
                             /*metrics*/true,
@@ -2273,7 +2343,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 updRes,
                 taskName,
                 expiry,
-                sndPrevVal);
+                sndPrevVal,
+                stripeIdxs);
         }
         else
             assert filtered.isEmpty();
@@ -2350,6 +2421,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param taskName Task name.
      * @param expiry Expiry policy.
      * @param sndPrevVal If {@code true} sends previous value to backups.
+     * @param stripeIdxs Stripe indexes.
      * @return Return value.
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
@@ -2364,7 +2436,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
-        boolean sndPrevVal
+        boolean sndPrevVal,
+        int[] stripeIdxs
     ) throws GridCacheEntryRemovedException {
         GridCacheReturn retVal = null;
         Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -2377,9 +2450,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
 
+        int keyNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
         // Avoid iterator creation.
-        for (int i = 0; i < req.size(); i++) {
-            KeyCacheObject k = req.key(i);
+        for (int i = 0; i < keyNum; i++) {
+            int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
+            KeyCacheObject k = req.key(trueIdx);
 
             GridCacheOperation op = req.operation();
 
@@ -2388,13 +2465,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             try {
                 GridDhtCacheEntry entry = locked.get(i);
 
-                GridCacheVersion newConflictVer = req.conflictVersion(i);
-                long newConflictTtl = req.conflictTtl(i);
-                long newConflictExpireTime = req.conflictExpireTime(i);
+                GridCacheVersion newConflictVer = req.conflictVersion(trueIdx);
+                long newConflictTtl = req.conflictTtl(trueIdx);
+                long newConflictExpireTime = req.conflictExpireTime(trueIdx);
 
                 assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
 
-                Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
+                Object writeVal = op == TRANSFORM ? req.entryProcessor(trueIdx) : req.writeValue(trueIdx);
 
                 Collection<UUID> readers = null;
                 Collection<UUID> filteredReaders = null;
@@ -2478,13 +2555,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
                             // If put the same value as in request then do not need to send it back.
                             if (op == TRANSFORM || writeVal != updRes.newValue()) {
-                                res.addNearValue(i,
+                                res.addNearValue(trueIdx,
                                     updRes.newValue(),
                                     updRes.newTtl(),
                                     updRes.conflictExpireTime());
                             }
                             else
-                                res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
+                                res.addNearTtl(trueIdx, updRes.newTtl(), updRes.conflictExpireTime());
 
                             if (updRes.newValue() != null) {
                                 IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
@@ -2495,10 +2572,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         else if (F.contains(readers, nearNode.id())) // Reader became primary or backup.
                             entry.removeReader(nearNode.id(), req.messageId());
                         else
-                            res.addSkippedIndex(i);
+                            res.addSkippedIndex(trueIdx);
                     }
                     else
-                        res.addSkippedIndex(i);
+                        res.addSkippedIndex(trueIdx);
                 }
 
                 if (updRes.removeVersion() != null) {
@@ -2548,7 +2625,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /**
      * @param hasNear {@code True} if originating node has near cache.
-     * @param firstEntryIdx Index of the first entry in the request keys collection.
+     * @param firstEntryIdx Index of the first entry in the request keys collection or in the stripeIdxs.
      * @param entries Entries to update.
      * @param ver Version to set.
      * @param nearNode Originating node.
@@ -2584,7 +2661,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final UpdateBatchResult batchRes,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
-        final boolean sndPrevVal
+        final boolean sndPrevVal,
+        final int[] stripeIdxs
     ) {
         assert putMap == null ^ rmvKeys == null;
 
@@ -2737,17 +2815,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (hasNear) {
+                        // it's index inside all keys
+                        int trueIdx = stripeIdxs == null ? firstEntryIdx + i : stripeIdxs[firstEntryIdx + i];
+
                         if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
                             int idx = firstEntryIdx + i;
 
                             if (req.operation() == TRANSFORM) {
-                                res.addNearValue(idx,
+                                res.addNearValue(trueIdx,
                                     writeVal,
                                     updRes.newTtl(),
                                     CU.EXPIRE_TIME_CALCULATE);
                             }
                             else
-                                res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+                                res.addNearTtl(trueIdx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
 
                             if (writeVal != null || entry.hasValue()) {
                                 IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
@@ -2758,7 +2839,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         else if (readers.contains(nearNode.id())) // Reader became primary or backup.
                             entry.removeReader(nearNode.id(), req.messageId());
                         else
-                            res.addSkippedIndex(firstEntryIdx + i);
+                            res.addSkippedIndex(trueIdx);
                     }
                 }
                 catch (GridCacheEntryRemovedException e) {
@@ -2789,12 +2870,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *
      * @param req Request with keys to lock.
      * @param topVer Topology version to lock on.
+     * @param stripeIdxs Stripe indexes.
      * @return Collection of locked entries.
      * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown,
      *      locks are released.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer)
+    private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req,
+        AffinityTopologyVersion topVer,
+        int[] stripeIdxs)
         throws GridDhtInvalidPartitionException {
         if (req.size() == 1) {
             KeyCacheObject key = req.key(0);
@@ -2811,11 +2895,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         else {
-            List<GridDhtCacheEntry> locked = new ArrayList<>(req.size());
+            int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+            List<GridDhtCacheEntry> locked = new ArrayList<>(keysNum);
 
             while (true) {
-                for (int i = 0; i < req.size(); i++) {
-                    GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+                for (int i = 0; i < keysNum; i++) {
+                    int idx = stripeIdxs == null ? i : stripeIdxs[i];
+
+                    GridDhtCacheEntry entry = entryExx(req.key(idx), topVer);
 
                     locked.add(entry);
                 }
@@ -3003,25 +3090,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        int size
     ) {
-        if (updateReq.size() == 1)
+        if (size == 1)
             return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
         else
-            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, size);
     }
 
     /**
      * @param nodeId Sender node ID.
+     * @param stripeIdx Stripe number.
      * @param req Near atomic update request.
      */
-    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req, int stripeIdx) {
         if (msgLog.isDebugEnabled()) {
             msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
-                ", node=" + nodeId + ']');
+                ", node=" + nodeId + ", stripe=" + stripeIdx + ']');
         }
 
-        updateAllAsyncInternal(nodeId, req, updateReplyClos);
+        updateAllAsyncInternal(nodeId, req, stripeIdx, updateReplyClos);
     }
 
     /**
@@ -3031,7 +3120,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @SuppressWarnings("unchecked")
     private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
         if (msgLog.isDebugEnabled())
-            msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
+            msgLog.debug("Received near atomic update response " +
+                "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() + ']');
 
         res.nodeId(ctx.localNodeId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5d5ddf0..af9fd8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -49,11 +49,12 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        int size
     ) {
         super(cctx, writeVer, updateReq);
 
-        mappings = U.newHashMap(updateReq.size());
+        mappings = U.newHashMap(size);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..770999a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -138,7 +139,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
     /** Future ID. */
     @GridToStringInclude
-    protected Long futId;
+    protected long futId = -1;
 
     /** Operation result. */
     protected GridCacheReturn opRes;
@@ -356,7 +357,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @return Response to notify about primary failure.
      */
     final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) {
-        assert req.response() == null : req;
         assert req.nodeId() != null : req;
 
         if (msgLog.isDebugEnabled()) {
@@ -423,6 +423,12 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         final GridNearAtomicAbstractUpdateRequest req;
 
         /** */
+        private GridNearAtomicUpdateResponse res;
+
+        /** */
+        private Map<Integer, GridNearAtomicUpdateResponse> resMap;
+
+        /** */
         @GridToStringInclude
         Set<UUID> dhtNodes;
 
@@ -431,7 +437,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         private Set<UUID> rcvd;
 
         /** */
-        private boolean hasRes;
+        private boolean hasDhtRes;
 
         /**
          * @param req Request.
@@ -516,13 +522,24 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         }
 
         /**
+         * @return {@code True} if all near results gathered.
+         */
+        private boolean hasAllNearResults() {
+            return res != null || (resMap != null && resMap.size() == req.stripes());
+        }
+
+        public String state() {
+            return resMap != null ? resMap.size() + "/" + req.stripes() : res != null ? "res" : "no";
+        }
+
+        /**
          * @return {@code True} if all expected responses are received.
          */
         private boolean finished() {
             if (req.writeSynchronizationMode() == PRIMARY_SYNC)
-                return hasRes;
+                return hasAllNearResults();
 
-            return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
+            return (dhtNodes != null && dhtNodes.isEmpty()) && hasDhtRes;
         }
 
         /**
@@ -536,13 +553,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
              * When primary failed, even if primary response is received, it is possible it failed to send
              * request to backup(s), need remap operation.
              */
-            if (req.fullSync() && !req.nodeFailedResponse()) {
-                req.resetResponse();
+            if (req.fullSync() && !nodeFailedResponse()) {
+                resetResponse();
 
                 return req;
             }
 
-            return req.response() == null ? req : null;
+            return hasAllNearResults() ? req : null;
         }
 
         /**
@@ -559,7 +576,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             if (finished())
                 return null;
 
-            return req.response() == null ? req : null;
+            return !hasAllNearResults() ? req : null;
         }
 
         /**
@@ -571,7 +588,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return DhtLeftResult.NOT_DONE;
 
             if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
-                if (hasRes)
+                if (hasDhtRes)
                     return DhtLeftResult.DONE;
                 else
                     return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
@@ -592,7 +609,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return false;
 
             if (res.hasResult())
-                hasRes = true;
+                hasDhtRes = true;
 
             if (dhtNodes == null) {
                 if (rcvd == null)
@@ -614,9 +631,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
             assert !finished() : this;
 
-            hasRes = true;
-
-            boolean onRes = req.onResponse(res);
+            boolean onRes = storeResponse(res);
 
             assert onRes;
 
@@ -639,7 +654,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @param cctx Context.
          */
         private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
-            assert dhtNodes == null || req.initMappingLocally();
+            assert F.isEmpty(dhtNodes) || req.initMappingLocally();
 
             Set<UUID> dhtNodes0 = dhtNodes;
 
@@ -664,12 +679,67 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 dhtNodes = Collections.emptySet();
         }
 
+        /**
+         * @return {@code True} if received notification about primary fail.
+         */
+        boolean nodeFailedResponse() {
+            return res != null && res.nodeLeftResponse();
+        }
+
+        /**
+         * @param res Response.
+         * @return {@code True} if current response was {@code null}.
+         */
+        private boolean storeResponse(GridNearAtomicUpdateResponse res) {
+            if (res.stripe() > -1) {
+                if (resMap == null)
+                    resMap = U.newHashMap(req.stripes());
+
+                if (!resMap.containsKey(res.stripe())) {
+                    resMap.put(res.stripe(), res);
+                    return true;
+                }
+            }
+            else {
+                if (this.res == null) {
+                    this.res = res;
+//                    this.resMap = null;
+
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        /**
+         *
+         */
+        void resetResponse() {
+            res = null;
+            resMap = null;
+        }
+
+        /**
+         * @return Response.
+         */
+        @Nullable public GridNearAtomicUpdateResponse response() {
+            return res;
+        }
+
+        /**
+         * @return Response.
+         */
+        @Nullable public Map<Integer, GridNearAtomicUpdateResponse> responses() {
+            return resMap;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(PrimaryRequestState.class, this,
                 "primary", primaryId(),
                 "needPrimaryRes", req.needPrimaryResponse(),
-                "primaryRes", req.response() != null,
+                "primaryRes", this.res != null,
                 "done", finished());
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a43bfb0..b549370 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -89,10 +89,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     @GridToStringExclude
     protected byte flags;
 
-    /** */
-    @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
-
     /**
      *
      */
@@ -249,41 +245,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param res Response.
-     * @return {@code True} if current response was {@code null}.
-     */
-    public boolean onResponse(GridNearAtomicUpdateResponse res) {
-        if (this.res == null) {
-            this.res = res;
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     *
-     */
-    void resetResponse() {
-        this.res = null;
-    }
-
-    /**
-     * @return Response.
-     */
-    @Nullable public GridNearAtomicUpdateResponse response() {
-        return res;
-    }
-
-    /**
-     * @return {@code True} if received notification about primary fail.
-     */
-    boolean nodeFailedResponse() {
-        return res != null && res.nodeLeftResponse();
-    }
-
-    /**
      * @return Topology locked flag.
      */
     final boolean topologyLocked() {
@@ -451,6 +412,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     public abstract int size();
 
     /**
+     * @return Number of stripes.
+     */
+    public int stripes() {
+        return 0;
+    }
+
+    /**
      * @param idx Key index.
      * @return Key.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index c381333..5a8c66b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -21,13 +21,16 @@ import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -40,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -61,6 +63,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     /** */
     private static final long serialVersionUID = 0L;
 
+    public static final int DIRECT_TYPE = 40;
+
     /** Keys to update. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
@@ -70,6 +74,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     @GridDirectCollection(CacheObject.class)
     private List<CacheObject> vals;
 
+    /** Stripe to index mapping. */
+    @GridDirectTransient
+    private int[] stripeCnt;
+
+    /** Stripe to index mapping bytes. */
+    @GridDirectMap(keyType = int.class, valueType = int[].class)
+    private Map<Integer, int[]> stripeMap;
+
     /** Entry processors. */
     @GridDirectTransient
     private List<EntryProcessor<Object, Object, Object>> entryProcessors;
@@ -109,6 +121,17 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     @GridDirectTransient
     private int initSize;
 
+    /** Maximum number of keys. */
+    @GridDirectTransient
+    private int maxEntryCnt;
+
+    /** Number of stripes on remote node. */
+    @GridDirectTransient
+    private int maxStripes;
+
+    /** Partition Id */
+    private int partId;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -155,7 +178,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         boolean skipStore,
         boolean keepBinary,
         boolean addDepInfo,
-        int maxEntryCnt
+        int maxEntryCnt,
+        int maxStripes
     ) {
         super(cacheId,
             nodeId,
@@ -181,6 +205,9 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         // than 10, we use it.
         initSize = Math.min(maxEntryCnt, 10);
 
+        this.maxEntryCnt = maxEntryCnt;
+        this.maxStripes = maxStripes;
+
         keys = new ArrayList<>(initSize);
     }
 
@@ -200,6 +227,25 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
         assert val != null || op == DELETE;
 
+        if (maxStripes > 1) {
+            int stripe = key.partition() % maxStripes;
+
+            if (stripeCnt == null)
+                stripeCnt = new int[maxStripes];
+
+            if (stripeMap == null)
+                stripeMap = new HashMap<>(maxStripes);
+
+            int[] idxs = stripeMap.get(stripe);
+
+            if (idxs == null)
+                stripeMap.put(stripe, idxs = new int[maxEntryCnt]);
+
+            int idx = stripeCnt[stripe];
+            idxs[idx] = keys.size();
+            stripeCnt[stripe] = idx + 1;
+        }
+
         keys.add(key);
 
         if (entryProcessor != null) {
@@ -267,6 +313,11 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
+    @Override public int stripes() {
+        return stripeMap.size();
+    }
+
+    /** {@inheritDoc} */
     @Override public KeyCacheObject key(int idx) {
         return keys.get(idx);
     }
@@ -353,6 +404,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         return expiryPlc;
     }
 
+    /**
+     * @return Stripe mapping.
+     */
+    @Nullable public Map<Integer, int[]> stripeMap() {
+        return stripeMap;
+    }
+
     /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
@@ -362,6 +420,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         if (expiryPlc != null && expiryPlcBytes == null)
             expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
 
+        if (stripeMap != null && stripeCnt != null) {
+            for (Integer idx : stripeMap.keySet()) {
+                stripeMap.put(idx, Arrays.copyOf(stripeMap.get(idx), stripeCnt[idx]));
+            }
+
+            stripeCnt = null;
+        }
+
         prepareMarshalCacheObjects(keys, cctx);
 
         if (filter != null) {
@@ -425,9 +491,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public int partition() {
-        assert !F.isEmpty(keys);
+        return partId;
+    }
 
-        return keys.get(0).partition();
+    /**
+     * @param partId Partition.
+     */
+    public void partition(int partId) {
+        this.partId = partId;
     }
 
     /** {@inheritDoc} */
@@ -494,6 +565,18 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 writer.incrementState();
 
             case 18:
+                if (!writer.writeInt("partId", partId))
+                    return false;
+
+                writer.incrementState();
+
+            case 19:
+                if (!writer.writeMap("stripeMap", stripeMap, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 20:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -580,6 +663,22 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 18:
+                partId = reader.readInt("partId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 19:
+                stripeMap = reader.readMap("stripeMap", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -594,24 +693,24 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public void cleanup(boolean clearKeys) {
-        vals = null;
-        entryProcessors = null;
-        entryProcessorsBytes = null;
-        invokeArgs = null;
-        invokeArgsBytes = null;
-
-        if (clearKeys)
-            keys = null;
+//            vals = null;
+//            entryProcessors = null;
+//            entryProcessorsBytes = null;
+//            invokeArgs = null;
+//            invokeArgsBytes = null;
+//
+//            if (clearKeys)
+//                keys = null;
     }
 
     /** {@inheritDoc} */
     @Override public byte directType() {
-        return 40;
+        return DIRECT_TYPE;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 19;
+        return 21;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 930c4af..2b6d2c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -125,7 +125,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    @Override public Long id() {
+    @Override public long id() {
         synchronized (mux) {
             return futId;
         }
@@ -216,7 +216,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == -1 || futId != res.futureId())
                 return;
 
             assert reqState != null;
@@ -258,7 +258,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         CachePartialUpdateCheckedException err0 = null;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == -1 || futId != res.futureId())
                 return;
 
             req = reqState.processPrimaryResponse(nodeId, res);
@@ -331,7 +331,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return Non-null topology version if update should be remapped.
      */
     private AffinityTopologyVersion onAllReceived() {
-        assert futId != null;
+        assert futId != -1;
 
         AffinityTopologyVersion remapTopVer0 = null;
 
@@ -362,7 +362,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             cctx.mvcc().removeAtomicFuture(futId);
 
             reqState = null;
-            futId = null;
+            futId = -1;
             topVer = AffinityTopologyVersion.ZERO;
 
             remapTopVer = null;
@@ -488,7 +488,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             reqState0 = mapSingleUpdate(topVer, futId);
 
             synchronized (mux) {
-                assert this.futId == null : this;
+                assert this.futId == -1 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -530,7 +530,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @param futId
      * @return
      */
-    private boolean checkDhtNodes(Long futId) {
+    private boolean checkDhtNodes(long futId) {
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
         AffinityTopologyVersion remapTopVer0 = null;
@@ -538,7 +538,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridNearAtomicCheckUpdateRequest checkReq = null;
 
         synchronized (mux) {
-            if (this.futId == null || !this.futId.equals(futId))
+            if (this.futId == -1 || this.futId != futId)
                 return false;
 
             assert reqState != null;
@@ -568,13 +568,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     /**
      * @return Future ID.
      */
-    private Long onFutureDone() {
+    private long onFutureDone() {
         Long id0;
 
         synchronized (mux) {
             id0 = futId;
 
-            futId = null;
+            futId = -1;
         }
 
         return id0;
@@ -694,6 +694,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 skipStore,
                 keepBinary,
                 cctx.deploymentEnabled(),
+                1,
                 1);
         }
 
@@ -721,9 +722,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         }
 
         if (nearEnabled) {
-            assert reqState.req.response() != null;
+            assert reqState.response() != null;
 
-            updateNear(reqState.req, reqState.req.response());
+            updateNear(reqState.req, reqState.response());
         }
 
         onDone(opRes, err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a44ccf9..9c4de66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -57,6 +57,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPES_CNT;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
@@ -150,7 +151,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override public Long id() {
+    @Override public long id() {
         synchronized (mux) {
             return futId;
         }
@@ -167,7 +168,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
 
         synchronized (mux) {
-            if (futId == null)
+            if (futId == -1)
                 return false;
 
             if (singleReq != null) {
@@ -282,10 +283,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             retval = Collections.emptyMap();
 
         if (super.onDone(retval, err)) {
-            Long futId = onFutureDone();
+            long futVer = onFutureDone();
 
-            if (futId != null)
-                cctx.mvcc().removeAtomicFuture(futId);
+            if (futVer > -1)
+                cctx.mvcc().removeAtomicFuture(futVer);
 
             return true;
         }
@@ -300,7 +301,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == -1 || futId != res.futureId())
                 return;
 
             PrimaryRequestState reqState;
@@ -370,10 +371,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
 
-        boolean rcvAll;
+        boolean rcvAll = false;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == -1 || futId != res.futureId())
                 return;
 
             if (singleReq != null) {
@@ -408,6 +409,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                         rcvAll = false;
                     }
+                    if (msgLog.isDebugEnabled())
+                        msgLog.debug("Processed near atomic update response " +
+                            "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() +
+                            ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done" : "") +
+                            " " + reqState.state() + ']');
                 }
                 else
                     return;
@@ -466,11 +472,18 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (rcvAll && nearEnabled) {
             if (mappings != null) {
                 for (PrimaryRequestState reqState : mappings.values()) {
-                    GridNearAtomicUpdateResponse res0 = reqState.req.response();
+                    if (reqState.responses() != null) {
+                        for (GridNearAtomicUpdateResponse res0 : reqState.responses().values()) {
+                            updateNear(reqState.req, res0);
+                        }
+                    }
+                    else {
+                        GridNearAtomicUpdateResponse res0 = reqState.response();
 
-                    assert res0 != null : reqState;
+                        assert res0 != null : reqState;
 
-                    updateNear(reqState.req, res0);
+                        updateNear(reqState.req, res0);
+                    }
                 }
             }
             else if (!nodeErr)
@@ -534,7 +547,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Non null topology version if update should be remapped.
      */
     @Nullable private AffinityTopologyVersion onAllReceived() {
-        assert futId != null;
+        assert futId != -1;
 
         AffinityTopologyVersion remapTopVer0 = null;
 
@@ -577,7 +590,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (remapTopVer0 != null) {
             cctx.mvcc().removeAtomicFuture(futId);
 
-            futId = null;
+            futId = -1;
             topVer = AffinityTopologyVersion.ZERO;
 
             remapTopVer = null;
@@ -596,7 +609,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (nearEnabled) {
             if (mappings != null) {
                 for (PrimaryRequestState reqState : mappings.values()) {
-                    GridNearAtomicUpdateResponse res0 = reqState.req.response();
+                    GridNearAtomicUpdateResponse res0 = reqState.response();
 
                     assert res0 != null : reqState;
 
@@ -604,9 +617,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 }
             }
             else {
-                assert singleReq != null && singleReq.req.response() != null;
+                assert singleReq != null && singleReq.response() != null;
 
-                updateNear(singleReq.req, singleReq.req.response());
+                updateNear(singleReq.req, singleReq.response());
             }
         }
 
@@ -767,7 +780,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
-        Long futId = cctx.mvcc().atomicFutureId();
+        long futId = cctx.mvcc().atomicFutureId();
 
         Exception err = null;
         PrimaryRequestState singleReq0 = null;
@@ -801,7 +814,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
 
             synchronized (mux) {
-                assert this.futId == null : this;
+                assert this.futId == -1 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -866,7 +879,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         boolean rcvAll = false;
 
         synchronized (mux) {
-            if (this.futId == null || !this.futId.equals(futId))
+            if (this.futId == -1 || this.futId != futId)
                 return;
 
             if (singleReq != null) {
@@ -934,13 +947,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     /**
      * @return Future version.
      */
-    private Long onFutureDone() {
-        Long id0;
+    private long onFutureDone() {
+        long id0;
 
         synchronized (mux) {
             id0 = futId;
 
-            futId = null;
+            futId = -1;
         }
 
         return id0;
@@ -957,7 +970,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
-        Long futId,
+        long futId,
         @Nullable Collection<KeyCacheObject> remapKeys,
         boolean mappingKnown) throws Exception {
         Iterator<?> it = null;
@@ -977,6 +990,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
 
+        int part = (int)(futId & 0xFFFF);
+
         // Create mappings first, then send messages.
         for (Object key : keys) {
             if (key == null)
@@ -1045,6 +1060,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             PrimaryRequestState mapped = pendingMappings.get(nodeId);
 
             if (mapped == null) {
+                int stripes = primary.attribute(ATTR_STRIPES_CNT) != null ? (int)primary.attribute(ATTR_STRIPES_CNT) : -1;
+
                 GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
                     cctx.cacheId(),
                     nodeId,
@@ -1063,7 +1080,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     skipStore,
                     keepBinary,
                     cctx.deploymentEnabled(),
-                    keys.size());
+                    keys.size(),
+                    stripes
+                );
+
+                req.partition(part);
 
                 mapped = new PrimaryRequestState(req, nodes, false);
 
@@ -1086,7 +1107,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Request.
      * @throws Exception If failed.
      */
-    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown)
+    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long futId, boolean mappingKnown)
         throws Exception {
         Object key = F.first(keys);
 
@@ -1168,6 +1189,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             skipStore,
             keepBinary,
             cctx.deploymentEnabled(),
+            1,
             1);
 
         req.addUpdateEntry(cacheKey,

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 4e20fc7..f8b45d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -96,6 +96,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Partition ID. */
     private int partId = -1;
 
+    /** Stripe. */
+    private int stripe = -1;
+
     /** */
     @GridDirectCollection(UUID.class)
     @GridToStringInclude
@@ -182,6 +185,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
+     * @param partId Partition ID for proper striping on near node.
+     */
+    public void partition(int partId) {
+        this.partId = partId;
+    }
+
+    /**
      * Sets update error.
      *
      * @param err Error.
@@ -427,6 +437,20 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
         return partId;
     }
 
+    /**
+     * @return Stripe number.
+     */
+    public int stripe() {
+        return stripe;
+    }
+
+    /**
+     * @param stripe Stripe number.
+     */
+    public void stripe(int stripe) {
+        this.stripe = stripe;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean addDeploymentInfo() {
         return addDepInfo;
@@ -524,6 +548,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 writer.incrementState();
 
+            case 15:
+                if (!writer.writeInt("stripe", stripe))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -636,6 +666,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
+            case 15:
+                stripe = reader.readInt("stripe");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
@@ -648,7 +686,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 16;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 62aecd1..5844021 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -130,7 +131,19 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res
     ) {
-        if (F.size(res.failedKeys()) == req.size())
+        int keyNum;
+        int[] stripeIdxs;
+
+        if (res.stripe() > -1 && req instanceof GridNearAtomicFullUpdateRequest) {
+            stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(res.stripe());
+            keyNum = stripeIdxs.length;
+        }
+        else {
+            stripeIdxs = null;
+            keyNum = req.keys().size();
+        }
+
+        if (F.size(res.failedKeys()) == keyNum)
             return;
 
         /*
@@ -150,11 +163,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        for (int i = 0; i < req.size(); i++) {
-            if (F.contains(skipped, i))
+        for (int i = 0; i < keyNum; i++) {
+            int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
+            if (F.contains(skipped, trueIdx))
                 continue;
 
-            KeyCacheObject key = req.key(i);
+            KeyCacheObject key = req.key(trueIdx);
 
             if (F.contains(failed, key))
                 continue;
@@ -170,7 +185,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
             CacheObject val = null;
 
-            if (F.contains(nearValsIdxs, i)) {
+            if (F.contains(nearValsIdxs, trueIdx)) {
                 val = res.nearValue(nearValIdx);
 
                 nearValIdx++;
@@ -179,7 +194,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 assert req.operation() != TRANSFORM;
 
                 if (req.operation() != DELETE)
-                    val = req.value(i);
+                    val = req.value(trueIdx);
             }
 
             long ttl = res.nearTtl(i);