You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/04 16:35:03 UTC

[1/3] ignite git commit: Fixed wrong 'send' method usage in GridIoManager.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4705-2 4ab159def -> 46c71ae51


Fixed wrong 'send' method usage in GridIoManager.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93e19962
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93e19962
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93e19962

Branch: refs/heads/ignite-4705-2
Commit: 93e19962114194072151840198f04f3406be068a
Parents: 50f8741
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 3 16:39:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 3 16:39:22 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridJobSiblingImpl.java     |   4 +-
 .../internal/managers/GridManagerAdapter.java   |   2 +-
 .../checkpoint/GridCheckpointManager.java       |   2 +-
 .../managers/communication/GridIoManager.java   | 206 +++++++------------
 .../deployment/GridDeploymentCommunication.java |   6 +-
 .../eventstorage/GridEventStorageManager.java   |   6 +-
 .../processors/cache/GridCacheIoManager.java    |   8 +-
 .../cache/transactions/IgniteTxManager.java     |   6 +-
 .../clock/GridClockSyncProcessor.java           |   2 +-
 .../continuous/GridContinuousProcessor.java     |   2 +-
 .../datastreamer/DataStreamProcessor.java       |   2 +-
 .../datastreamer/DataStreamerImpl.java          |   2 +-
 .../internal/processors/igfs/IgfsContext.java   |   8 +-
 .../processors/job/GridJobProcessor.java        |   4 +-
 .../internal/processors/job/GridJobWorker.java  |   2 +-
 .../marshaller/ClientRequestFuture.java         |   2 +-
 .../GridMarshallerMappingProcessor.java         |   2 +-
 .../handlers/task/GridTaskCommandHandler.java   |   4 +-
 .../processors/task/GridTaskProcessor.java      |   2 +-
 .../processors/task/GridTaskWorker.java         |   4 +-
 .../GridCommunicationSendMessageSelfTest.java   |   2 +-
 .../communication/GridIoManagerSelfTest.java    |  28 +--
 .../nio/IgniteExceptionInNioWorkerSelfTest.java |   2 +-
 .../communication/GridIoManagerBenchmark.java   |   4 +-
 .../communication/GridIoManagerBenchmark0.java  |  14 +-
 .../communication/GridCacheMessageSelfTest.java |   2 +-
 .../hadoop/shuffle/HadoopShuffle.java           |   3 +-
 .../processors/query/h2/IgniteH2Indexing.java   |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   6 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   2 +-
 30 files changed, 140 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
index 2d95f85..79ac416 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
@@ -150,7 +150,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
 
             if (!nodes.isEmpty()) {
                 try {
-                    ctx.io().send(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
+                    ctx.io().sendToGridTopic(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     throw U.convertException(e);
@@ -169,7 +169,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
                 ctx.job().cancelJob(ses.getId(), jobId, false);
             else {
                 try {
-                    ctx.io().send(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL);
+                    ctx.io().sendToGridTopic(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     // Avoid stack trace for left nodes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index e864916..d993376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -391,7 +391,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
 
                         try {
                             if (msg instanceof Message)
-                                ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL);
+                                ctx.io().sendToCustomTopic(node, topic, (Message)msg, SYSTEM_POOL);
                             else
                                 ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false);
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 9124caf..8ce8b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -239,7 +239,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
                             ClusterNode node = ctx.discovery().node(ses.getTaskNodeId());
 
                             if (node != null)
-                                ctx.io().send(
+                                ctx.io().sendToGridTopic(
                                     node,
                                     TOPIC_CHECKPOINT,
                                     new GridCheckpointRequest(ses.getId(), key, ses.getCheckpointSpi()),

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 38b5441..5e91ea9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -191,6 +191,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** */
     private final AtomicLong ioTestId = new AtomicLong();
 
+    /** No-op runnable. */
+    private static final IgniteRunnable NOOP = new IgniteRunnable() {
+        @Override public void run() {
+            // No-op.
+        }
+    };
+
     /**
      * @param ctx Grid kernal context.
      */
@@ -328,7 +335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     res.flags(msg0.flags());
 
                     try {
-                        send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
+                        sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send IO test response [msg=" + msg0 + "]", e);
@@ -367,7 +374,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             ClusterNode node = nodes.get(i);
 
             try {
-                send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+                sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
                 ioTestMap().remove(msg.id());
@@ -397,7 +404,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         ioTestMap().put(id, fut);
 
         try {
-            send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+            sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
             ioTestMap().remove(msg.id());
@@ -791,8 +798,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 finally {
                     threadProcessingMessage(false);
 
-                    if (msgC != null)
-                        msgC.run();
+                    msgC.run();
                 }
             }
 
@@ -915,46 +921,46 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * Remove listener if it matches expected value.
      *
      * @param topic Topic.
-     * @param expected Listener.
+     * @param exp Listener.
      * @return Result.
      */
-    private boolean listenerRemove0(Object topic, GridMessageListener expected) {
+    private boolean listenerRemove0(Object topic, GridMessageListener exp) {
         if (topic instanceof GridTopic) {
             synchronized (sysLsnrsMux) {
-                return systemListenerChange(topic, expected, null);
+                return systemListenerChange(topic, exp, null);
             }
         }
         else
-            return lsnrMap.remove(topic, expected);
+            return lsnrMap.remove(topic, exp);
     }
 
     /**
      * Replace listener.
      *
      * @param topic Topic.
-     * @param expected Old value.
+     * @param exp Old value.
      * @param newVal New value.
      * @return Result.
      */
-    private boolean listenerReplace0(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+    private boolean listenerReplace0(Object topic, GridMessageListener exp, GridMessageListener newVal) {
         if (topic instanceof GridTopic) {
             synchronized (sysLsnrsMux) {
-                return systemListenerChange(topic, expected, newVal);
+                return systemListenerChange(topic, exp, newVal);
             }
         }
         else
-            return lsnrMap.replace(topic, expected, newVal);
+            return lsnrMap.replace(topic, exp, newVal);
     }
 
     /**
      * Change system listener.
      *
      * @param topic Topic.
-     * @param expected Expected value.
+     * @param exp Expected value.
      * @param newVal New value.
      * @return Result.
      */
-    private boolean systemListenerChange(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+    private boolean systemListenerChange(Object topic, GridMessageListener exp, GridMessageListener newVal) {
         assert Thread.holdsLock(sysLsnrsMux);
         assert topic instanceof GridTopic;
 
@@ -962,7 +968,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         GridMessageListener old = sysLsnrs[idx];
 
-        if (old != null && old.equals(expected)) {
+        if (old != null && old.equals(exp)) {
             changeSystemListener(idx, newVal);
 
             return true;
@@ -1263,6 +1269,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         assert node != null;
         assert topic != null;
         assert msg != null;
+        assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
+        assert topicOrd >= 0 || !(topic instanceof GridTopic) : msg;
 
         GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
 
@@ -1276,11 +1284,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
-            else if (async) {
-                assert msg instanceof GridIoUserMessage : ioMsg; // Async execution was added only for IgniteMessaging.
-
-                processRegularMessage(locNodeId, ioMsg, plc, null);
-            }
+            else if (async)
+                processRegularMessage(locNodeId, ioMsg, plc, NOOP);
             else
                 processRegularMessage0(ioMsg, locNodeId);
 
@@ -1313,14 +1318,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(UUID nodeId, Object topic, Message msg, byte plc)
+    public void sendToCustomTopic(UUID nodeId, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
         ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, msg, plc);
+        sendToCustomTopic(node, topic, msg, plc);
     }
 
     /**
@@ -1331,7 +1336,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public void send(UUID nodeId, GridTopic topic, Message msg, byte plc)
+    public void sendToGridTopic(UUID nodeId, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
         ClusterNode node = ctx.discovery().node(nodeId);
 
@@ -1348,7 +1353,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, Object topic, Message msg, byte plc)
+    public void sendToCustomTopic(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
         send(node, topic, -1, msg, plc, false, 0, false, null, false);
     }
@@ -1358,12 +1363,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param async Async flag.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+    public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, async);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1374,7 +1378,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
+    public void sendGeneric(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
         throws IgniteCheckedException {
         send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
     }
@@ -1402,33 +1406,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param nodeId Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param timeout Timeout to keep a message on receiving queue.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void sendOrderedMessage(
-        UUID nodeId,
-        Object topic,
-        Message msg,
-        byte plc,
-        long timeout,
-        boolean skipOnTimeout
-    ) throws IgniteCheckedException {
-        assert timeout > 0 || skipOnTimeout;
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
-    }
-
-    /**
      * @param node Destination nodes.
      * @param topic Topic to send the message to.
      * @param msg Message to send.
@@ -1436,8 +1413,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
-        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+    public void sendToGridTopic(ClusterNode node,
+        GridTopic topic,
+        Message msg,
+        byte plc,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException
+    {
         send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
     }
 
@@ -1450,9 +1431,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param skipOnTimeout Whether message can be skipped on timeout.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void sendOrderedMessage(
+    void sendOrderedMessageToGridTopic(
         Collection<? extends ClusterNode> nodes,
-        Object topic,
+        GridTopic topic,
         Message msg,
         byte plc,
         long timeout,
@@ -1461,36 +1442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
-    }
-
-    /**
-     * @param node Destination nodes.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param ackC Ack closure.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
-        throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
-    }
-
-    /**
-     * @param nodes Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void send(
-        Collection<? extends ClusterNode> nodes,
-        Object topic,
-        Message msg,
-        byte plc
-    ) throws IgniteCheckedException {
-        send(nodes, topic, -1, msg, plc, false, 0, false);
+        send(nodes, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout);
     }
 
     /**
@@ -1500,7 +1452,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(
+    public void sendToGridTopic(
         Collection<? extends ClusterNode> nodes,
         GridTopic topic,
         Message msg,
@@ -1540,7 +1492,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param msg Message to send.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
+    void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
         sendUserMessage(nodes, msg, null, false, 0, false);
     }
 
@@ -1556,8 +1508,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @SuppressWarnings("ConstantConditions")
-    public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg,
-        @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException {
+    public void sendUserMessage(Collection<? extends ClusterNode> nodes,
+        Object msg,
+        @Nullable Object topic,
+        boolean ordered,
+        long timeout,
+        boolean async) throws IgniteCheckedException
+    {
         boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
 
         byte[] serMsg = null;
@@ -1600,22 +1557,42 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             dep != null ? dep.participants() : null);
 
         if (ordered)
-            sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
-        else if (loc)
-            send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+            sendOrderedMessageToGridTopic(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
+        else if (loc) {
+            send(F.first(nodes),
+                TOPIC_COMM_USER,
+                TOPIC_COMM_USER.ordinal(),
+                ioMsg,
+                PUBLIC_POOL,
+                false,
+                0,
+                false,
+                null,
+                async);
+        }
         else {
             ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
 
             Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId));
 
             if (!rmtNodes.isEmpty())
-                send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+                sendToGridTopic(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
 
             // Will call local listeners in current thread synchronously or through pool,
             // depending async flag, so must go the last
             // to allow remote nodes execute the requested operation in parallel.
-            if (locNode != null)
-                send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+            if (locNode != null) {
+                send(locNode,
+                    TOPIC_COMM_USER,
+                    TOPIC_COMM_USER.ordinal(),
+                    ioMsg,
+                    PUBLIC_POOL,
+                    false,
+                    0,
+                    false,
+                    null,
+                    async);
+            }
         }
     }
 
@@ -1657,35 +1634,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param nodeId Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param timeout Timeout to keep a message on receiving queue.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackC Ack closure.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void sendOrderedMessage(
-        UUID nodeId,
-        Object topic,
-        Message msg,
-        byte plc,
-        long timeout,
-        boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackC
-    ) throws IgniteCheckedException {
-        assert timeout > 0 || skipOnTimeout;
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
-    }
-
-    /**
      * @param nodes Destination nodes.
      * @param topic Topic to send the message to.
      * @param topicOrd Topic ordinal value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index a571ae4..ffbde37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -294,7 +294,7 @@ class GridDeploymentCommunication {
 
         if (node != null) {
             try {
-                ctx.io().send(node, topic, res, GridIoPolicy.P2P_POOL);
+                ctx.io().sendToCustomTopic(node, topic, res, GridIoPolicy.P2P_POOL);
 
                 if (log.isDebugEnabled())
                     log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']');
@@ -324,7 +324,7 @@ class GridDeploymentCommunication {
         Message req = new GridDeploymentRequest(null, null, rsrcName, true);
 
         if (!rmtNodes.isEmpty()) {
-            ctx.io().send(
+            ctx.io().sendToGridTopic(
                 rmtNodes,
                 TOPIC_CLASSLOAD,
                 req,
@@ -445,7 +445,7 @@ class GridDeploymentCommunication {
             if (req.responseTopic() != null && !ctx.localNodeId().equals(dstNode.id()))
                 req.responseTopicBytes(U.marshal(marsh, req.responseTopic()));
 
-            ctx.io().send(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);
+            ctx.io().sendToGridTopic(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);
 
             if (log.isDebugEnabled())
                 log.debug("Sent peer class loading request [node=" + dstNode.id() + ", req=" + req + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index b5d5ee2..656c739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1040,12 +1040,12 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(ctx.localNodeId()));
 
         if (locNode != null)
-            ctx.io().send(locNode, topic, msg, plc);
+            ctx.io().sendToGridTopic(locNode, topic, msg, plc);
 
         if (!rmtNodes.isEmpty()) {
             msg.responseTopicBytes(U.marshal(marsh, msg.responseTopic()));
 
-            ctx.io().send(rmtNodes, topic, msg, plc);
+            ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
         }
     }
 
@@ -1164,7 +1164,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
                         res.exceptionBytes(U.marshal(marsh, res.exception()));
                     }
 
-                    ctx.io().send(node, req.responseTopic(), res, PUBLIC_POOL);
+                    ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send event query response to node [node=" + nodeId + ", res=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index d20310b..50f58cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -908,7 +908,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             try {
                 cnt++;
 
-                cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+                cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
 
                 return;
             }
@@ -969,7 +969,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     }
                 });
 
-                cctx.gridIO().send(nodesView, TOPIC_CACHE, msg, plc);
+                cctx.gridIO().sendToGridTopic(nodesView, TOPIC_CACHE, msg, plc);
 
                 boolean added = false;
 
@@ -1116,7 +1116,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param plc IO policy.
      * @throws IgniteCheckedException If send failed.
      */
-    public void sendNoRetry(ClusterNode node,
+    void sendNoRetry(ClusterNode node,
         GridCacheMessage msg,
         byte plc)
         throws IgniteCheckedException {
@@ -1127,7 +1127,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             return;
 
         try {
-            cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+            cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
 
             if (log.isDebugEnabled())
                 log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a1a18fe..f4a5629 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2065,7 +2065,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 if (!cctx.localNodeId().equals(nodeId))
                     req.prepareMarshal(cctx);
 
-                cctx.gridIO().send(node, TOPIC_TX, req, SYSTEM_POOL);
+                cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
                 if (e instanceof ClusterTopologyCheckedException) {
@@ -2508,7 +2508,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                         if (!cctx.localNodeId().equals(nodeId))
                             res.prepareMarshal(cctx);
 
-                        cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+                        cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e);
@@ -2545,7 +2545,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     res.futureId(req.futureId());
 
                     try {
-                        cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+                        cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 257d0d9..d644261 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -320,7 +320,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
                     snapshot.version(), snapshot.deltas());
 
                 try {
-                    ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
+                    ctx.io().sendToGridTopic(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     if (ctx.discovery().pingNodeNoError(n.id()))

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 575bc69..55f65c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1397,7 +1397,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                             ackC);
                     }
                     else
-                        ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
+                        ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
 
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index b6400e8..74d5f4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -420,7 +420,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
 
         try {
-            ctx.io().send(nodeId, resTopic, res, threadIoPolicy());
+            ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy());
         }
         catch (IgniteCheckedException e) {
             if (ctx.discovery().alive(nodeId))

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index f97fc14..4c1de2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1693,7 +1693,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     topVer);
 
                 try {
-                    ctx.io().send(node, TOPIC_DATASTREAM, req, plc);
+                    ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, plc);
 
                     if (log.isDebugEnabled())
                         log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 4c037b7..0b2558a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -169,7 +170,10 @@ public class IgfsContext {
         if (!kernalContext().localNodeId().equals(nodeId))
             msg.prepareMarshal(kernalContext().config().getMarshaller());
 
-        kernalContext().io().send(nodeId, topic, msg, plc);
+        if (topic instanceof GridTopic)
+            kernalContext().io().sendToGridTopic(nodeId, (GridTopic)topic, msg, plc);
+        else
+            kernalContext().io().sendToCustomTopic(nodeId, topic, msg, plc);
     }
 
     /**
@@ -184,7 +188,7 @@ public class IgfsContext {
         if (!kernalContext().localNodeId().equals(node.id()))
             msg.prepareMarshal(kernalContext().config().getMarshaller());
 
-        kernalContext().io().send(node, topic, msg, plc);
+        kernalContext().io().sendToCustomTopic(node, topic, msg, plc);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 2b6699d..9ed6ff3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -518,7 +518,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
             ctx.io().addMessageListener(topic, msgLsnr);
 
             // 3. Send message.
-            ctx.io().send(taskNode, TOPIC_JOB_SIBLINGS,
+            ctx.io().sendToGridTopic(taskNode, TOPIC_JOB_SIBLINGS,
                 new GridJobSiblingsRequest(ses.getId(),
                     loc ? topic : null,
                     loc ? null : U.marshal(marsh, topic)),
@@ -1379,7 +1379,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                 ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
             else
                 // Send response to common topic as unordered message.
-                ctx.io().send(sndNode, TOPIC_TASK, jobRes, req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL);
+                ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
             // The only option here is to log, as we must assume that resending will fail too.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index acefde7..9b7615f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -912,7 +912,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                                 ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
                             else
                                 // Send response to common topic as unordered message.
-                                ctx.io().send(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
+                                ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
                         }
                         catch (IgniteCheckedException e) {
                             // Log and invoke the master-leave callback.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
index 773dabe..0be4e09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
@@ -96,7 +96,7 @@ final class ClientRequestFuture extends GridFutureAdapter<MappingExchangeResult>
                 ClusterNode srvNode = aliveSrvNodes.poll();
 
                 try {
-                    ioMgr.send(
+                    ioMgr.sendToGridTopic(
                             srvNode,
                             GridTopic.TOPIC_MAPPING_MARSH,
                             new MissingMappingRequestMessage(

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index fdea869..66c19a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -178,7 +178,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
             String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
 
             try {
-                ioMgr.send(
+                ioMgr.sendToGridTopic(
                         nodeId,
                         TOPIC_MAPPING_MARSH,
                         new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 947435c..99ba335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -143,7 +143,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
 
                     Object topic = U.unmarshal(ctx, req.topicBytes(), U.resolveClassLoader(ctx.config()));
 
-                    ctx.io().send(nodeId, topic, res, SYSTEM_POOL);
+                    ctx.io().sendToCustomTopic(nodeId, topic, res, SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send job task result response.", e);
@@ -494,7 +494,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
             try {
                 byte[] topicBytes = U.marshal(ctx, topic);
 
-                ctx.io().send(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
+                ctx.io().sendToGridTopic(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
                 String errMsg = "Failed to send task result request [resHolderId=" + resHolderId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d32b51c..ec5d4c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1311,7 +1311,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
 
                     boolean loc = ctx.localNodeId().equals(nodeId);
 
-                    ctx.io().send(nodeId, topic,
+                    ctx.io().sendToCustomTopic(nodeId, topic,
                         new GridJobSiblingsResponse(
                             loc ? siblings : null,
                             loc ? null : U.marshal(marsh, siblings)),

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index d18ea5f..02ef0fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1281,7 +1281,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                     ClusterNode node = ctx.discovery().node(nodeId);
 
                     if (node != null)
-                        ctx.io().send(node,
+                        ctx.io().sendToGridTopic(node,
                             TOPIC_JOB_CANCEL,
                             new GridJobCancelRequest(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true),
                             PUBLIC_POOL);
@@ -1382,7 +1382,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
                     else {
                         // Send job execution request.
-                        ctx.io().send(node, TOPIC_JOB, req, internal ? MANAGEMENT_POOL : PUBLIC_POOL);
+                        ctx.io().sendToGridTopic(node, TOPIC_JOB, req, internal ? MANAGEMENT_POOL : PUBLIC_POOL);
 
                         if (log.isDebugEnabled())
                             log.debug("Sent job request [req=" + req + ", node=" + node + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 8503b48..f58be87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -129,7 +129,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
         long time = System.nanoTime();
 
         for (int i = 1; i <= SAMPLE_CNT; i++) {
-            mgr0.send(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
+            mgr0.sendToCustomTopic(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
 
             if (i % 500 == 0)
                 info("Sent messages count: " + i);

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index f5499d3..f4257a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -90,21 +90,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
     public void testSendIfOneOfNodesIsLocalAndTopicIsEnum() throws Exception {
         GridTestUtils.assertThrows(log, new Callable<Object>() {
             @Override public Object call() throws Exception {
-                new GridIoManager(ctx).send(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new TestMessage(),
-                    GridIoPolicy.P2P_POOL);
-
-                return null;
-            }
-        }, AssertionError.class, "Internal Ignite code should never call the method with local node in a node list.");
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSendIfOneOfNodesIsLocalAndTopicIsObject() throws Exception {
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                new GridIoManager(ctx).send(F.asList(locNode, rmtNode), new Object(), new TestMessage(),
+                new GridIoManager(ctx).sendToGridTopic(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new TestMessage(),
                     GridIoPolicy.P2P_POOL);
 
                 return null;
@@ -127,12 +113,12 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
             // No-op. We are using mocks so real sending is impossible.
         }
 
-        verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
+        verify(ioMgr).sendToGridTopic(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
             eq(GridIoPolicy.PUBLIC_POOL));
 
         Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
 
-        verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
+        verify(ioMgr).sendToGridTopic(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
             any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
     }
 
@@ -151,12 +137,12 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
             // No-op. We are using mocks so real sending is impossible.
         }
 
-        verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
+        verify(ioMgr).sendToGridTopic(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
             eq(GridIoPolicy.PUBLIC_POOL));
 
         Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
 
-        verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
+        verify(ioMgr).sendToGridTopic(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
             any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
     }
 
@@ -175,7 +161,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
             // No-op. We are using mocks so real sending is impossible.
         }
 
-        verify(ioMgr).sendOrderedMessage(
+        verify(ioMgr).sendOrderedMessageToGridTopic(
             argThat(new IsEqualCollection(F.asList(locNode, rmtNode))),
             eq(GridTopic.TOPIC_COMM_USER),
             any(GridIoUserMessage.class),
@@ -196,7 +182,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+        @Override public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
             throws IgniteCheckedException {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
index 9961833..8ac6e6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
@@ -74,7 +74,7 @@ public class IgniteExceptionInNioWorkerSelfTest extends GridCommonAbstractTest {
             UUID nodeId = ignite(1).cluster().localNode().id();
 
             // This should trigger a failure in a NIO thread.
-            kernal.context().io().send(nodeId, GridTopic.TOPIC_CACHE.topic("cache"), new BrokenMessage(), (byte)0);
+            kernal.context().io().sendToCustomTopic(nodeId, GridTopic.TOPIC_CACHE.topic("cache"), new BrokenMessage(), (byte)0);
 
             for (int i = 0; i < 100; i++)
                 ignite(0).cache("cache").put(i, i);

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
index 723495c..03bbb00 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
@@ -249,7 +249,7 @@ public class GridIoManagerBenchmark {
                 testMsg.bytes(null);
 
                 try {
-                    io.send(node, TEST_TOPIC, testMsg, PUBLIC_POOL);
+                    io.sendToCustomTopic(node, TEST_TOPIC, testMsg, PUBLIC_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     e.printStackTrace();
@@ -293,7 +293,7 @@ public class GridIoManagerBenchmark {
                     else
                         sem.acquire();
 
-                    io.send(
+                    io.sendToCustomTopic(
                         dst,
                         TEST_TOPIC,
                         new GridTestMessage(msgId, testHeavyMsgs ? arrs[rnd.nextInt(arrs.length)] : null),

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index f2c6255..92b29e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -132,7 +132,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             new GridMessageListener() {
                 @Override public void onMessage(UUID nodeId, Object msg) {
                     try {
-                        rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+                        rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         error("Failed to send message.", e);
@@ -176,7 +176,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
                     while (!finish.get()) {
                         sem.acquire();
 
-                        snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+                        snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
                     }
                 }
                 catch (IgniteCheckedException e) {
@@ -226,7 +226,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             new GridMessageListener() {
                 @Override public void onMessage(UUID nodeId, Object msg) {
                     try {
-                        rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+                        rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         error("Failed to send message.", e);
@@ -270,7 +270,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
 
                         map.put(msgId, latch);
 
-                        snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+                        snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
 
                         latch.await();
 
@@ -326,7 +326,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             new GridMessageListener() {
                 @Override public void onMessage(UUID nodeId, Object msg) {
                     try {
-                        rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+                        rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         error("Failed to send message.", e);
@@ -362,7 +362,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
 
                     sem.acquire();
 
-                    snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+                    snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
                 }
 
                 return null;
@@ -432,7 +432,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
 
                     latches.put(msgId, latch);
 
-                    snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+                    snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
 
                     long start = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 9c97542..c0ea662 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -194,7 +194,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
             msg.add(mes2);
         }
 
-        mgr0.send(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
+        mgr0.sendToCustomTopic(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
 
         assert latch.await(3, SECONDS);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index e67a26a..7575ff4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -145,7 +145,7 @@ public class HadoopShuffle extends HadoopComponent {
         ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
 
         if (msg instanceof Message)
-            ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
+            ctx.kernalContext().io().sendToGridTopic(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
         else
             ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
     }
@@ -153,6 +153,7 @@ public class HadoopShuffle extends HadoopComponent {
     /**
      * @param jobId Task info.
      * @return Shuffle job.
+     * @throws IgniteCheckedException If failed.
      */
     private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException {
         HadoopShuffleJob<UUID> res = jobs.get(jobId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..88cd89b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1890,7 +1890,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         ((GridCacheQueryMarshallable)msg).marshall(marshaller);
                 }
 
-                ctx.io().send(node, topic, topicOrd, msg, plc);
+                ctx.io().sendGeneric(node, topic, topicOrd, msg, plc);
             }
             catch (IgniteCheckedException e) {
                 ok = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 2802da5..33a6778 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -655,7 +655,7 @@ public class GridMapQueryExecutor {
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
             }
             else
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+                ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
         }
         catch (Exception e) {
             e.addSuppressed(err);
@@ -729,7 +729,7 @@ public class GridMapQueryExecutor {
             if (loc)
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
             else
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+                ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
         }
         catch (IgniteCheckedException e) {
             log.error("Failed to send message.", e);
@@ -756,7 +756,7 @@ public class GridMapQueryExecutor {
             if (loc)
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
             else
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+                ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
         }
         catch (Exception e) {
             U.warn(log, "Failed to send retry message: " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 61ca11d..604e522 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -328,7 +328,7 @@ public class GridReduceQueryExecutor {
                         if (node.isLocal())
                             h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
                         else
-                            ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL);
+                            ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         throw new CacheException("Failed to fetch data from node: " + node.id(), e);


[3/3] ignite git commit: ignite-4705

Posted by sb...@apache.org.
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46c71ae5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46c71ae5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46c71ae5

Branch: refs/heads/ignite-4705-2
Commit: 46c71ae512edc39a364b0389b9057e9935517130
Parents: ff55b4d
Author: sboikov <sb...@gridgain.com>
Authored: Sat Mar 4 12:46:53 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Sat Mar 4 19:33:17 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |  11 +-
 .../GridCachePartitionExchangeManager.java      |  85 +++++------
 .../GridDhtAtomicAbstractUpdateFuture.java      |  15 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 140 ++++++++++---------
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |   5 -
 .../GridDhtAtomicSingleUpdateRequest.java       |   6 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   5 -
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |   6 +-
 .../GridNearAtomicAbstractUpdateFuture.java     |  61 ++++++--
 .../GridNearAtomicAbstractUpdateRequest.java    |  14 +-
 .../GridNearAtomicCheckUpdateRequest.java       |   9 +-
 .../GridNearAtomicSingleUpdateFuture.java       |  34 ++---
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  76 +++++++---
 .../GridDhtPartitionsExchangeFuture.java        |   4 +-
 ...niteCacheClientNodeChangingTopologyTest.java |   5 +-
 15 files changed, 287 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 9aec10d..ba2f86e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -216,13 +216,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             if (fut != null && !fut.isDone()) {
                 fut.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> t) {
-                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                        Runnable c = new Runnable() {
                             @Override public void run() {
                                 IgniteLogger log = cacheMsg.messageLogger(cctx);
 
                                 if (log.isDebugEnabled()) {
                                     StringBuilder msg0 = new StringBuilder("Process cache message after wait for " +
-                                        "affinity topology version [");
+                                            "affinity topology version [");
 
                                     appendMessageInfo(cacheMsg, nodeId, msg0).append(']');
 
@@ -231,7 +231,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                                 handleMessage(nodeId, cacheMsg);
                             }
-                        });
+                        };
+
+                        if (((GridCacheMessage)msg).partition() >= 0)
+                            cctx.kernalContext().getStripedExecutorService().execute(((GridCacheMessage) msg).partition(), c);
+                        else
+                            cctx.kernalContext().closure().runLocalSafe(c);
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e44f4a8..5a64758 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -62,6 +62,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPar
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -79,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -1477,68 +1480,70 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param exchTopVer Exchange topology version.
      */
     private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer) {
-        IgniteTxManager tm = cctx.tm();
+        synchronized (getClass()) {
+            IgniteTxManager tm = cctx.tm();
 
-        if (tm != null) {
-            U.warn(log, "Pending transactions:");
+            if (tm != null) {
+                U.warn(log, "Pending transactions:");
 
-            for (IgniteInternalTx tx : tm.activeTransactions()) {
-                if (exchTopVer != null) {
-                    U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() +
-                        ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) +
-                        ", tx=" + tx + ']');
+                for (IgniteInternalTx tx : tm.activeTransactions()) {
+                    if (exchTopVer != null) {
+                        U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() +
+                                ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) +
+                                ", tx=" + tx + ']');
+                    }
+                    else
+                        U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']');
                 }
-                else
-                    U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']');
             }
-        }
 
-        GridCacheMvccManager mvcc = cctx.mvcc();
+            GridCacheMvccManager mvcc = cctx.mvcc();
 
-        if (mvcc != null) {
-            U.warn(log, "Pending explicit locks:");
+            if (mvcc != null) {
+                U.warn(log, "Pending explicit locks:");
 
-            for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks())
-                U.warn(log, ">>> " + lockSpan);
+                for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks())
+                    U.warn(log, ">>> " + lockSpan);
 
-            U.warn(log, "Pending cache futures:");
+                U.warn(log, "Pending cache futures:");
 
-            for (GridCacheFuture<?> fut : mvcc.activeFutures())
-                U.warn(log, ">>> " + fut);
+                for (GridCacheFuture<?> fut : mvcc.activeFutures())
+                    U.warn(log, ">>> " + fut);
 
-            U.warn(log, "Pending atomic cache futures:");
+                U.warn(log, "Pending atomic cache futures:");
 
-            for (GridCacheFuture<?> fut : mvcc.atomicFutures())
-                U.warn(log, ">>> " + fut);
+                for (GridCacheFuture<?> fut : mvcc.atomicFutures())
+                    U.warn(log, ">>> " + fut);
 
-            U.warn(log, "Pending data streamer futures:");
+                U.warn(log, "Pending data streamer futures:");
 
-            for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
-                U.warn(log, ">>> " + fut);
+                for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
+                    U.warn(log, ">>> " + fut);
 
-            if (tm != null) {
-                U.warn(log, "Pending transaction deadlock detection futures:");
+                if (tm != null) {
+                    U.warn(log, "Pending transaction deadlock detection futures:");
 
-                for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures())
-                    U.warn(log, ">>> " + fut);
+                    for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures())
+                        U.warn(log, ">>> " + fut);
+                }
             }
-        }
 
-        for (GridCacheContext ctx : cctx.cacheContexts()) {
-            if (ctx.isLocal())
-                continue;
+            for (GridCacheContext ctx : cctx.cacheContexts()) {
+                if (ctx.isLocal())
+                    continue;
 
-            GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
+                GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
 
-            GridCachePreloader preloader = ctx0.preloader();
+                GridCachePreloader preloader = ctx0.preloader();
 
-            if (preloader != null)
-                preloader.dumpDebugInfo();
+                if (preloader != null)
+                    preloader.dumpDebugInfo();
 
-            GridCacheAffinityManager affMgr = ctx0.affinity();
+                GridCacheAffinityManager affMgr = ctx0.affinity();
 
-            if (affMgr != null)
-                affMgr.dumpDebugInfo();
+                if (affMgr != null)
+                    affMgr.dumpDebugInfo();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5e01726..dcd4a18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -78,6 +79,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     protected final GridCacheContext cctx;
 
     /** Future version. */
+    @GridToStringInclude
     protected final Long futId;
 
     /** Update request. */
@@ -94,9 +96,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     private volatile int resCnt;
 
     /** */
+    @GridToStringExclude
     private final GridNearAtomicUpdateResponse updateRes;
 
     /** */
+    @GridToStringExclude
     private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
 
     /**
@@ -368,9 +372,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
         boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
             !ret.emptyResult() ||
-            updateRes.nearVersion() != null;
+            updateRes.nearVersion() != null ||
+            cctx.localNodeId().equals(nearNode.id());
 
-        boolean needMapping = updateReq.fullSync() && (!updateReq.mappingKnown() || !allUpdated());
+        boolean needMapping = updateReq.fullSync() && (!updateReq.needPrimaryResponse() || !allUpdated());
 
         if (needMapping) {
             initMapping(updateRes);
@@ -470,12 +475,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /**
-     * @param updateRes Response.
-     * @param err Error.
-     */
-    protected abstract void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err);
-
-    /**
      * @param nodeId Node ID.
      * @param futId Future ID.
      * @param writeVer Update version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fc0e16c..5cd07b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -3078,7 +3078,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param checkReq Request.
      */
     private void processCheckUpdateRequest(UUID nodeId, GridNearAtomicCheckUpdateRequest checkReq) {
-        /**
+        /*
          * Message is processed in the same stripe, so primary already processed update request. It is possible
          * response was not sent if operation result was empty. Near node will get original response or this one.
          */
@@ -3104,6 +3104,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param req Dht atomic update request.
      */
     private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) {
+        assert Thread.currentThread().getName().startsWith("sys-stripe-") : Thread.currentThread().getName();
+
         if (msgLog.isDebugEnabled()) {
             msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() +
                 ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
@@ -3272,66 +3274,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
-     *
-     */
-    private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable {
-        /** */
-        private final int part;
-
-        /** */
-        private final UUID primaryId;
-
-        /** */
-        private final IgniteUuid id;
-
-        /** */
-        private final long endTime;
-
-        /**
-         * @param part Partition.
-         * @param primaryId Primary ID.
-         */
-        DeferredUpdateTimeout(int part, UUID primaryId) {
-            this.part = part;
-            this.primaryId = primaryId;
-
-            endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
-
-            id = IgniteUuid.fromUuid(primaryId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteUuid timeoutId() {
-            return id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long endTime() {
-            return endTime;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() {
-            Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
-
-            GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
-
-            if (msg != null && msg.timeoutSender() == this) {
-                msg.timeoutSender(null);
-
-                resMap.remove(primaryId);
-
-                sendDeferredUpdateResponse(primaryId, msg);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            ctx.kernalContext().getStripedExecutorService().execute(part, this);
-        }
-    }
-
-    /**
      * @param part Partition.
      * @param primaryId Primary ID.
      * @param futId Future ID.
@@ -3362,7 +3304,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         futIds.add(futId);
 
-        if (futIds.size() == DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) {
+        if (futIds.size() >= DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) {
             resMap.remove(primaryId);
 
             sendDeferredUpdateResponse(primaryId, msg);
@@ -3375,14 +3317,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) {
         try {
-            ctx.kernalContext().gateway().readLock();
+            //ctx.kernalContext().gateway().readLock();
 
-            GridTimeoutObject timeoutSnd = msg.timeoutSender();
+            try {
+                GridTimeoutObject timeoutSnd = msg.timeoutSender();
 
-            if (timeoutSnd != null)
-                ctx.time().removeTimeoutObject(timeoutSnd);
+                if (timeoutSnd != null)
+                    ctx.time().removeTimeoutObject(timeoutSnd);
 
-            try {
                 ctx.io().send(primaryId, msg, ctx.ioPolicy());
 
                 if (msgLog.isDebugEnabled()) {
@@ -3391,7 +3333,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             }
             finally {
-                ctx.kernalContext().gateway().readUnlock();
+               // ctx.kernalContext().gateway().readUnlock();
             }
         }
         catch (IllegalStateException ignored) {
@@ -3706,7 +3648,67 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      *
      */
-    static interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> {
+    interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> {
         // No-op.
     }
+
+    /**
+     *
+     */
+    private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable {
+        /** */
+        private final int part;
+
+        /** */
+        private final UUID primaryId;
+
+        /** */
+        private final IgniteUuid id;
+
+        /** */
+        private final long endTime;
+
+        /**
+         * @param part Partition.
+         * @param primaryId Primary ID.
+         */
+        DeferredUpdateTimeout(int part, UUID primaryId) {
+            this.part = part;
+            this.primaryId = primaryId;
+
+            endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
+
+            id = IgniteUuid.fromUuid(primaryId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+
+            GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+
+            if (msg != null && msg.timeoutSender() == this) {
+                msg.timeoutSender(null);
+
+                resMap.remove(primaryId);
+
+                sendDeferredUpdateResponse(primaryId, msg);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            ctx.kernalContext().getStripedExecutorService().execute(part, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 0566ce4..879dd40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -120,11 +120,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
         }
     }
 
-    /** {@inheritDoc} */
-    @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
-
-    }
-
     /**
      * @param ttl TTL.
      * @param conflictExpireTime Conflict expire time.

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 10dc77c..092bccb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -197,7 +197,11 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public int partition() {
-        return key.partition();
+        int p = key.partition();
+
+        assert p >= 0;
+
+        return p;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 49e168a..32df24d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -103,11 +103,6 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     }
 
     /** {@inheritDoc} */
-    @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
-
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtAtomicUpdateFuture.class, this, "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 78368fb..6b8af8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -427,7 +427,11 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     @Override public int partition() {
         assert !F.isEmpty(keys) || !F.isEmpty(nearKeys);
 
-        return keys.size() > 0 ? keys.get(0).partition() : nearKeys.get(0).partition();
+        int p = keys.size() > 0 ? keys.get(0).partition() : nearKeys.get(0).partition();
+
+        assert p >= 0;
+
+        return p;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 5369d53..bb1e224 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -205,8 +206,21 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         this.remapCnt = remapCnt;
     }
 
-    void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
+    /** {@inheritDoc} */
+    @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        return null;
+    }
 
+    void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
+        try {
+            cctx.io().send(req.updateRequest().nodeId(), req, cctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException e) {
+            onSendError(req, e);
+        }
+        catch (IgniteCheckedException e) {
+            onDone(e);
+        }
     }
 
     /**
@@ -370,6 +384,17 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         onPrimaryResponse(req.nodeId(), res, true);
     }
 
+    final void onSendError(GridNearAtomicCheckUpdateRequest req, IgniteCheckedException e) {
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                req.updateRequest().nodeId(),
+                req.futureId(),
+                cctx.deploymentEnabled());
+
+        res.addFailedKeys(req.updateRequest().keys(), e);
+
+        onPrimaryResponse(req.updateRequest().nodeId(), res, true);
+    }
+
     /**
      *
      */
@@ -381,7 +406,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         @GridToStringInclude
         private Set<UUID> dhtNodes;
 
+        @GridToStringInclude
+        private List<UUID> dhtNodes0;
+
         /** */
+        @GridToStringInclude
         private Set<UUID> rcvd;
 
         /** */
@@ -396,6 +425,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             this.req = req;
 
             if (req.initMappingLocally()) {
+                dhtNodes0 = new ArrayList<>();
+
+                for (ClusterNode n : nodes)
+                    dhtNodes0.add(n.id());
+
                 if (single) {
                     if (nodes.size() > 1) {
                         dhtNodes = U.newHashSet(nodes.size() - 1);
@@ -424,13 +458,16 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
             for (int i = 1; i < nodes.size(); i++)
                 dhtNodes.add(nodes.get(i).id());
+
+            for (int i = 1; i < nodes.size(); i++)
+                dhtNodes0.add(nodes.get(i).id());
         }
 
-        boolean checkDhtNodes(GridCacheContext cctx) {
+        DhtLeftResult checkDhtNodes(GridCacheContext cctx) {
             assert req.initMappingLocally() : req;
 
             if (finished())
-                return false;
+                return DhtLeftResult.NOT_DONE;
 
             boolean finished = false;
 
@@ -448,7 +485,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 }
             }
 
-            return finished;
+            if (finished)
+                return DhtLeftResult.DONE;
+
+            if (dhtNodes.isEmpty())
+                return req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
+
+            return DhtLeftResult.NOT_DONE;
         }
 
         /**
@@ -487,7 +530,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 if (hasRes)
                     return DhtLeftResult.DONE;
                 else
-                    return req.mappingKnown() ? DhtLeftResult.ALL_RCVD_CHECK_UPDATE : DhtLeftResult.NOT_DONE;
+                    return req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
             }
 
             return DhtLeftResult.NOT_DONE;
@@ -572,8 +615,10 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(PrimaryRequestState.class, this,
-                "node", req.nodeId(),
-                "rcvdRes", req.response() != null);
+                "primary", primaryId(),
+                "mapppingKnown", req.needPrimaryResponse(),
+                "primaryRes", req.response() != null,
+                "done", finished());
         }
     }
 
@@ -588,7 +633,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         NOT_DONE,
 
         /** */
-        ALL_RCVD_CHECK_UPDATE
+        ALL_RCVD_CHECK_PRIMARY
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a833588..34a3c0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -47,7 +47,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     public static final int CACHE_MSG_IDX = nextIndexId();
 
     /** . */
-    private static final int MAPPING_KNOWN_FLAG_MASK = 0x01;
+    private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
 
     /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
     private static final int TOP_LOCKED_FLAG_MASK = 0x02;
@@ -141,7 +141,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         this.addDepInfo = addDepInfo;
 
         if (mappingKnown)
-            mappingKnown(true);
+            needPrimaryResponse(true);
 
         if (topLocked)
             topologyLocked(true);
@@ -177,11 +177,11 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     boolean initMappingLocally() {
-        return mappingKnown() && fullSync();
+        return needPrimaryResponse() && fullSync();
     }
 
-    boolean mappingKnown() {
-        return isFlag(MAPPING_KNOWN_FLAG_MASK);
+    boolean needPrimaryResponse() {
+        return isFlag(NEED_PRIMARY_RES_FLAG_MASK);
     }
 
     boolean fullSync() {
@@ -190,8 +190,8 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         return syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
     }
 
-    void mappingKnown(boolean stableTop) {
-        setFlag(stableTop, MAPPING_KNOWN_FLAG_MASK);
+    void needPrimaryResponse(boolean stableTop) {
+        setFlag(stableTop, NEED_PRIMARY_RES_FLAG_MASK);
     }
 
     public int taskNameHash() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
index 030abdf..a30269b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
@@ -51,14 +51,13 @@ public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage {
         // No-op.
     }
 
-    GridNearAtomicCheckUpdateRequest(int cacheId, GridNearAtomicAbstractUpdateRequest updateReq, int partId, long futId) {
+    GridNearAtomicCheckUpdateRequest(GridNearAtomicAbstractUpdateRequest updateReq) {
         assert updateReq != null;
-        assert partId >= 0 : partId;
 
-        this.cacheId = cacheId;
         this.updateReq = updateReq;
-        this.partId = partId;
-        this.futId = futId;
+        this.cacheId = updateReq.cacheId();
+        this.partId = updateReq.partition();
+        this.futId = updateReq.futureId();
     }
 
     long futureId() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 82d397d..f96de31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -164,11 +162,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 if (res == DhtLeftResult.DONE)
                     rcvAll = true;
-                else if (res == DhtLeftResult.ALL_RCVD_CHECK_UPDATE) {
-                    checkReq = new GridNearAtomicCheckUpdateRequest(cctx.cacheId(),
-                        reqState.req,
-                        reqState.req.partition(),
-                        futId);
+                else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+                    checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
                 }
                 else
                     return false;
@@ -190,11 +185,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @SuppressWarnings("ConstantConditions")
     @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
         assert res == null || res instanceof GridCacheReturn;
@@ -529,9 +519,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return
      */
     private boolean checkDhtNodes(Long futId) {
-        GridCacheReturn opRes0;
-        CachePartialUpdateCheckedException err0;
-        AffinityTopologyVersion remapTopVer0;
+        GridCacheReturn opRes0 = null;
+        CachePartialUpdateCheckedException err0 = null;
+        AffinityTopologyVersion remapTopVer0 = null;
+
+        GridNearAtomicCheckUpdateRequest checkReq = null;
 
         synchronized (mux) {
             if (this.futId == null || !this.futId.equals(futId))
@@ -539,16 +531,24 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
             assert reqState != null;
 
-            if (reqState.checkDhtNodes(cctx)) {
+            DhtLeftResult res = reqState.checkDhtNodes(cctx);
+
+            if (res == DhtLeftResult.DONE) {
                 opRes0 = opRes;
                 err0 = err;
                 remapTopVer0 = onAllReceived();
             }
+            else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY){
+                checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
+            }
             else
                 return true;
         }
 
-        finishUpdateFuture(opRes0, err0, remapTopVer0);
+        if (checkReq != null)
+            sendCheckUpdateRequest(checkReq);
+        else
+            finishUpdateFuture(opRes0, err0, remapTopVer0);
 
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 7b1c530..77ba579 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -164,6 +164,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         boolean rcvAll = false;
 
+        List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
+
         synchronized (mux) {
             if (futId == null)
                 return false;
@@ -173,6 +175,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     GridNearAtomicAbstractUpdateRequest req = singleReq.processPrimaryResponse(nodeId);
 
                     if (req != null) {
+                        rcvAll = true;
+
                         GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
 
                         singleReq.onPrimaryResponse(res, cctx);
@@ -181,7 +185,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     }
                 }
                 else {
-                    singleReq.onDhtNodeLeft(nodeId);
+                    DhtLeftResult res = singleReq.onDhtNodeLeft(nodeId);
+
+                    if (res == DhtLeftResult.DONE)
+                        rcvAll = true;
+                    else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+                        checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req));
                 }
 
                 if (rcvAll) {
@@ -215,7 +224,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                         }
                     }
                     else {
-                        reqState.onDhtNodeLeft(nodeId);
+                        DhtLeftResult res = reqState.onDhtNodeLeft(nodeId);
+
+                        if (res == DhtLeftResult.DONE)
+                            reqDone = true;
+                        else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+                            if (checkReqs == null)
+                                checkReqs = new ArrayList<>();
+
+                            checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req));
+                        }
                     }
 
                     if (reqDone) {
@@ -237,18 +255,19 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
         }
 
-        if (rcvAll)
+        if (checkReqs != null) {
+            assert !rcvAll;
+
+            for (int i = 0; i < checkReqs.size(); i++)
+                sendCheckUpdateRequest(checkReqs.get(i));
+        }
+        else if (rcvAll)
             finishUpdateFuture(opRes0, err0, remapTopVer0);
 
         return false;
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @SuppressWarnings("ConstantConditions")
     @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
         assert res == null || res instanceof GridCacheReturn;
@@ -824,25 +843,33 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         CachePartialUpdateCheckedException err0 = null;
         AffinityTopologyVersion remapTopVer0 = null;
 
+        List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
+
+        boolean rcvAll = false;
+
         synchronized (mux) {
             if (this.futId == null || !this.futId.equals(futId))
                 return false;
 
             if (singleReq != null) {
-                if (singleReq.checkDhtNodes(cctx)) {
+                DhtLeftResult res = singleReq.checkDhtNodes(cctx);
+
+                if (res == DhtLeftResult.DONE) {
                     opRes0 = opRes;
                     err0 = err;
                     remapTopVer0 = onAllReceived();
                 }
+                else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+                    checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req));
                 else
                     return true;
             }
             else {
                 if (mappings != null) {
-                    boolean rcvAll = false;
-
                     for (PrimaryRequestState reqState : mappings.values()) {
-                        if (reqState.checkDhtNodes(cctx)) {
+                        DhtLeftResult res = reqState.checkDhtNodes(cctx);
+
+                        if (res == DhtLeftResult.DONE) {
                             assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
 
                             resCnt++;
@@ -857,19 +884,34 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                                 break;
                             }
                         }
-                    }
+                        else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+                            if (checkReqs == null)
+                                checkReqs = new ArrayList<>(mappings.size());
 
-                    if (!rcvAll)
-                        return true;
+                            checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req));
+                        }
+                    }
                 }
                 else
                     return true;
             }
         }
 
-        finishUpdateFuture(opRes0, err0, remapTopVer0);
+        if (checkReqs != null) {
+            assert !rcvAll;
 
-        return false;
+            for (int i = 0; i < checkReqs.size(); i++)
+                sendCheckUpdateRequest(checkReqs.get(i));
+
+            return false;
+        }
+        else if (rcvAll) {
+            finishUpdateFuture(opRes0, err0, remapTopVer0);
+
+            return false;
+        }
+
+        return true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a334fd5..34b8540 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -783,7 +783,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             catch (IgniteFutureTimeoutCheckedException ignored) {
                 // Print pending transactions and locks that might have led to hang.
                 if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
-                    dumpPendingObjects();
+                    synchronized (getClass()) {
+                        dumpPendingObjects();
+                    }
 
                     dumpedObjects++;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 081e49f..222620a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -1750,7 +1750,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
         for (int i = 0; i < SRV_CNT; i++)
             startGrid(i);
 
-        final int CLIENT_CNT = 4;
+        final int CLIENT_CNT = 1;
 
         final List<Ignite> clients = new ArrayList<>();
 
@@ -1768,7 +1768,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         final AtomicInteger threadIdx = new AtomicInteger(0);
 
-        final int THREADS = CLIENT_CNT * 3;
+        final int THREADS = CLIENT_CNT * 1;
 
         final ConcurrentHashSet<Integer> putKeys = new ConcurrentHashSet<>();
 
@@ -1834,6 +1834,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
                                     }
                                 }
                                 else
+                                    //cache.put(map.keySet().iterator().next(), map.values().iterator().next());
                                     cache.putAll(map);
 
                                 putKeys.addAll(map.keySet());


[2/3] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-2.0' into ignite-4705-2

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-2.0' into ignite-4705-2


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff55b4d2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff55b4d2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff55b4d2

Branch: refs/heads/ignite-4705-2
Commit: ff55b4d265f9af2eb85f61f245b0c4c3e4598c18
Parents: 4ab159d 93e1996
Author: sboikov <sb...@gridgain.com>
Authored: Sat Mar 4 12:05:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Sat Mar 4 12:05:07 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridJobSiblingImpl.java     |   4 +-
 .../internal/managers/GridManagerAdapter.java   |   2 +-
 .../checkpoint/GridCheckpointManager.java       |   2 +-
 .../managers/communication/GridIoManager.java   | 206 +++++++------------
 .../deployment/GridDeploymentCommunication.java |   6 +-
 .../eventstorage/GridEventStorageManager.java   |   6 +-
 .../processors/cache/GridCacheIoManager.java    |   8 +-
 .../cache/transactions/IgniteTxManager.java     |   6 +-
 .../clock/GridClockSyncProcessor.java           |   2 +-
 .../continuous/GridContinuousProcessor.java     |   2 +-
 .../datastreamer/DataStreamProcessor.java       |   2 +-
 .../datastreamer/DataStreamerImpl.java          |   2 +-
 .../internal/processors/igfs/IgfsContext.java   |   8 +-
 .../processors/job/GridJobProcessor.java        |   4 +-
 .../internal/processors/job/GridJobWorker.java  |   2 +-
 .../marshaller/ClientRequestFuture.java         |   2 +-
 .../GridMarshallerMappingProcessor.java         |   2 +-
 .../handlers/task/GridTaskCommandHandler.java   |   4 +-
 .../processors/task/GridTaskProcessor.java      |   2 +-
 .../processors/task/GridTaskWorker.java         |   4 +-
 .../GridCommunicationSendMessageSelfTest.java   |   2 +-
 .../communication/GridIoManagerSelfTest.java    |  28 +--
 .../nio/IgniteExceptionInNioWorkerSelfTest.java |   2 +-
 .../communication/GridIoManagerBenchmark.java   |   4 +-
 .../communication/GridIoManagerBenchmark0.java  |  14 +-
 .../communication/GridCacheMessageSelfTest.java |   2 +-
 .../hadoop/shuffle/HadoopShuffle.java           |   3 +-
 .../processors/query/h2/IgniteH2Indexing.java   |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   6 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   2 +-
 30 files changed, 140 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ff55b4d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff55b4d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff55b4d2/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------