You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/03/03 09:34:19 UTC

[1/7] ignite git commit: Revert: ignite-3727 Support local listeners async execution for IgniteMessage.send

Repository: ignite
Updated Branches:
  refs/heads/master 684dc7be1 -> de2a1588e


Revert: ignite-3727 Support local listeners async execution for IgniteMessage.send


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d811f1b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d811f1b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d811f1b

Branch: refs/heads/master
Commit: 9d811f1b56a07a463f9ca88535581097477929b2
Parents: bea863f
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed Mar 1 19:20:18 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed Mar 1 19:20:18 2017 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteMessaging.java |  13 +-
 .../ignite/internal/IgniteMessagingImpl.java    |   8 +-
 .../internal/managers/GridManagerAdapter.java   |   2 +-
 .../managers/communication/GridIoManager.java   |  55 +-
 .../communication/GridIoManagerSelfTest.java    |   8 +-
 ...niteMessagingConfigVariationFullApiTest.java | 195 ++-----
 .../ignite/messaging/GridMessagingSelfTest.java | 116 +---
 .../messaging/IgniteMessagingSendAsyncTest.java | 544 -------------------
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 -
 .../hadoop/shuffle/HadoopShuffle.java           |   4 +-
 10 files changed, 77 insertions(+), 870 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
index e64ded5..ff52ed8 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
@@ -77,10 +77,6 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
 
     /**
      * Sends given message with specified topic to the nodes in the underlying cluster group.
-     * <p>
-     * By default all local listeners will be executed in the calling thread, or if you use
-     * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's
-     * responsibility to implement back-pressure and limit number of concurrently executed async messages).
      *
      * @param topic Topic to send to, {@code null} for default topic.
      * @param msg Message to send.
@@ -91,10 +87,6 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
 
     /**
      * Sends given messages with the specified topic to the nodes in the underlying cluster group.
-     * <p>
-     * By default all local listeners will be executed in the calling thread, or if you use
-     * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's
-     * responsibility to implement back-pressure and limit number of concurrently executed async messages).
      *
      * @param topic Topic to send to, {@code null} for default topic.
      * @param msgs Messages to send. Order of the sending is undefined. If the method produces
@@ -107,8 +99,7 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
     /**
      * Sends given message with specified topic to the nodes in the underlying cluster group. Messages sent with
      * this method will arrive in the same order they were sent. Note that if a topic is used
-     * for ordered messages, then it cannot be reused for non-ordered messages. Note that local listeners
-     * are always executed in public thread pool, no matter default or {@link #withAsync()} mode is used.
+     * for ordered messages, then it cannot be reused for non-ordered messages.
      * <p>
      * The {@code timeout} parameter specifies how long an out-of-order message will stay in a queue,
      * waiting for messages that are ordered ahead of it to arrive. If timeout expires, then all ordered
@@ -171,4 +162,4 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
 
     /** {@inheritDoc} */
     @Override IgniteMessaging withAsync();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index 541fad4..6b33aa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -86,7 +86,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
             if (snapshot.isEmpty())
                 throw U.emptyTopologyException();
 
-            ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync());
+            ctx.io().sendUserMessage(snapshot, msg, topic, false, 0);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -111,7 +111,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
             for (Object msg : msgs) {
                 A.notNull(msg, "msg");
 
-                ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync());
+                ctx.io().sendUserMessage(snapshot, msg, topic, false, 0);
             }
         }
         catch (IgniteCheckedException e) {
@@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
             if (timeout == 0)
                 timeout = ctx.config().getNetworkTimeout();
 
-            ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, false);
+            ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -254,4 +254,4 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
     protected Object readResolve() throws ObjectStreamException {
         return prj.message();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 5992eda..584cc56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -390,7 +390,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                             if (msg instanceof Message)
                                 ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL);
                             else
-                                ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false);
+                                ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0);
                         }
                         catch (IgniteCheckedException e) {
                             throw unwrapException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 84b4543..7ef7bc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -785,8 +785,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 finally {
                     threadProcessingMessage(false);
 
-                    if (msgC != null)
-                        msgC.run();
+                    msgC.run();
                 }
             }
 
@@ -1238,7 +1237,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
      * @param ackC Ack closure.
-     * @param async If {@code true} message for local node will be processed in pool, otherwise in current thread.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private void send(
@@ -1250,8 +1248,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         boolean ordered,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackC,
-        boolean async
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
@@ -1269,11 +1266,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
-            else if (async) {
-                assert msg instanceof GridIoUserMessage : ioMsg; // Async execution was added only for IgniteMessaging.
-
-                processRegularMessage(locNodeId, ioMsg, plc, null);
-            }
             else
                 processRegularMessage0(ioMsg, locNodeId);
 
@@ -1331,7 +1323,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
     }
 
     /**
@@ -1343,7 +1335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, null, false);
+        send(node, topic, -1, msg, plc, false, 0, false, null);
     }
 
     /**
@@ -1351,12 +1343,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param async Async flag.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, async);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
     }
 
     /**
@@ -1369,7 +1360,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
+        send(node, topic, topicOrd, msg, plc, false, 0, false, null);
     }
 
     /**
@@ -1391,7 +1382,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
     }
 
     /**
@@ -1418,7 +1409,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
     }
 
     /**
@@ -1431,7 +1422,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
         IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
     }
 
     /**
@@ -1467,7 +1458,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
+        send(node, topic, -1, msg, plc, false, 0, false, ackC);
     }
 
     /**
@@ -1523,10 +1514,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
     }
 
-    /**
+     /**
      * Sends a peer deployable user message.
      *
      * @param nodes Destination nodes.
@@ -1534,7 +1525,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
-        sendUserMessage(nodes, msg, null, false, 0, false);
+        sendUserMessage(nodes, msg, null, false, 0);
     }
 
     /**
@@ -1545,12 +1536,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Message topic to use.
      * @param ordered Is message ordered?
      * @param timeout Message timeout in milliseconds for ordered messages.
-     * @param async Async flag.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @SuppressWarnings("ConstantConditions")
     public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg,
-        @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException {
+        @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException {
         boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
 
         byte[] serMsg = null;
@@ -1595,7 +1585,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (ordered)
             sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
         else if (loc)
-            send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+            send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
         else {
             ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
 
@@ -1604,11 +1594,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             if (!rmtNodes.isEmpty())
                 send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
 
-            // Will call local listeners in current thread synchronously or through pool,
-            // depending async flag, so must go the last
+            // Will call local listeners in current thread synchronously, so must go the last
             // to allow remote nodes execute the requested operation in parallel.
             if (locNode != null)
-                send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+                send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
         }
     }
 
@@ -1675,7 +1664,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
     }
 
     /**
@@ -1712,7 +1701,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             // messages to one node vs. many.
             if (!nodes.isEmpty()) {
                 for (ClusterNode node : nodes)
-                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false);
+                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
             }
             else if (log.isDebugEnabled())
                 log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
@@ -1940,7 +1929,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (rmv && log.isDebugEnabled())
             log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']');
 
-        if (lsnr instanceof ArrayListener) {
+        if (lsnr instanceof ArrayListener)
+        {
             for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr)
                 closeListener(childLsnr);
         }
@@ -1952,7 +1942,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
     /**
      * Closes a listener, if applicable.
-     *
      * @param lsnr Listener.
      */
     private void closeListener(GridMessageListener lsnr) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index f5499d3..2039d81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -145,7 +145,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
 
         try {
-            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L, false);
+            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L);
         }
         catch (IgniteCheckedException ignored) {
             // No-op. We are using mocks so real sending is impossible.
@@ -169,7 +169,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
 
         try {
-            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L, false);
+            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L);
         }
         catch (Exception ignored) {
             // No-op. We are using mocks so real sending is impossible.
@@ -196,7 +196,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+        @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
             throws IgniteCheckedException {
             // No-op.
         }
@@ -257,4 +257,4 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
             return 0;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
index 49aab10..31b0663 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
@@ -58,18 +58,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     public void testLocalServer() throws Exception {
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                localServerInternal(false);
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLocalServerAsync() throws Exception {
-        runInAllDataModes(new TestRunnable() {
-            @Override public void run() throws Exception {
-                localServerInternal(true);
+                localServerInternal();
             }
         });
     }
@@ -94,21 +83,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                serverClientMessage(false);
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testServerClientMessageAsync() throws Exception {
-        if (!testsCfg.withClients())
-            return;
-
-        runInAllDataModes(new TestRunnable() {
-            @Override public void run() throws Exception {
-                serverClientMessage(true);
+                serverClientMessage();
             }
         });
     }
@@ -122,21 +97,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientClientMessage(false);
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientClientMessageAsync() throws Exception {
-        if (!testsCfg.withClients())
-            return;
-
-        runInAllDataModes(new TestRunnable() {
-            @Override public void run() throws Exception {
-                clientClientMessage(true);
+                clientClientMessage();
             }
         });
     }
@@ -150,21 +111,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientServerMessage(false);
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientServerMessageAsync() throws Exception {
-        if (!testsCfg.withClients())
-            return;
-
-        runInAllDataModes(new TestRunnable() {
-            @Override public void run() throws Exception {
-                clientServerMessage(true);
+                clientServerMessage();
             }
         });
     }
@@ -186,18 +133,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     public void testOrderedMessage() throws Exception {
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                orderedMessage(false);
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testOrderedMessageAsync() throws Exception {
-        runInAllDataModes(new TestRunnable() {
-            @Override public void run() throws Exception {
-                orderedMessage(true);
+                orderedMessage();
             }
         });
     }
@@ -211,7 +147,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientServerOrderedMessage(false);
+                clientServerOrderedMessage();
             }
         });
     }
@@ -219,42 +155,13 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     /**
      * @throws Exception If failed.
      */
-    public void testClientServerOrderedMessageAsync() throws Exception {
-        if (!testsCfg.withClients())
-            return;
-
-        runInAllDataModes(new TestRunnable() {
-            @Override public void run() throws Exception {
-                clientServerOrderedMessage(true);
-            }
-        });
-    }
-
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testClientClientOrderedMessage() throws Exception {
         if (!testsCfg.withClients())
             return;
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientClientOrderedMessage(false);
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientClientOrderedMessageAsync() throws Exception {
-        if (!testsCfg.withClients())
-            return;
-
-        runInAllDataModes(new TestRunnable() {
-            @Override public void run() throws Exception {
-                clientClientOrderedMessage(true);
+                clientClientOrderedMessage();
             }
         });
     }
@@ -268,32 +175,16 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                serverClientOrderedMessage(false);
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testServerClientOrderedMessageAsync() throws Exception {
-        if (!testsCfg.withClients())
-            return;
-
-        runInAllDataModes(new TestRunnable() {
-            @Override public void run() throws Exception {
-                serverClientOrderedMessage(true);
+                serverClientOrderedMessage();
             }
         });
     }
 
     /**
      * Single server test.
-     *
-     * @param async Async message send flag.
      * @throws Exception If failed.
      */
-    private void localServerInternal(boolean async) throws Exception {
+    private void localServerInternal() throws Exception {
         int messages = MSGS;
 
         Ignite ignite = grid(SERVER_NODE_IDX);
@@ -306,7 +197,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         try {
             for (int i = 0; i < messages; i++)
-                sendMessage(ignite, grp, value(i), async);
+                sendMessage(ignite, grp, value(i));
 
             assertTrue(LATCH.await(10, TimeUnit.SECONDS));
 
@@ -347,59 +238,52 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
     /**
      * Server sends a message and client receives it.
-     *
-     * @param async Async message send flag.
      * @throws Exception If failed.
      */
-    private void serverClientMessage(boolean async) throws Exception {
+    private void serverClientMessage() throws Exception {
         Ignite ignite = grid(SERVER_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendMessages(ignite, grp, async);
+        registerListenerAndSendMessages(ignite, grp);
     }
 
     /**
      * Client sends a message and client receives it.
-     *
-     * @param async Async message send flag.
      * @throws Exception If failed.
      */
-    private void clientClientMessage(boolean async) throws Exception {
+    private void clientClientMessage() throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendMessages(ignite, grp, async);
+        registerListenerAndSendMessages(ignite, grp);
     }
 
     /**
      * Client sends a message and client receives it.
-     *
-     * @param async Async message send flag.
      * @throws Exception If failed.
      */
-    private void clientServerMessage(boolean async) throws Exception {
+    private void clientServerMessage() throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forServers();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendMessages(ignite, grp, async);
+        registerListenerAndSendMessages(ignite, grp);
     }
 
     /**
      * @param ignite Ignite.
      * @param grp Cluster group.
-     * @param async Async message send flag.
      * @throws Exception If fail.
      */
-    private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception {
+    private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp) throws Exception {
         int messages = MSGS;
 
         LATCH = new CountDownLatch(grp.nodes().size() * messages);
@@ -408,7 +292,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         try {
             for (int i = 0; i < messages; i++)
-                sendMessage(ignite, grp, value(i), async);
+                sendMessage(ignite, grp, value(i));
 
             assertTrue(LATCH.await(10, TimeUnit.SECONDS));
 
@@ -451,68 +335,67 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     }
 
     /**
-     * @param async Async message send flag.
+     *
      * @throws Exception If fail.
      */
-    private void orderedMessage(boolean async) throws Exception {
+    private void orderedMessage() throws Exception {
         Ignite ignite = grid(SERVER_NODE_IDX);
 
         ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp, async);
+        registerListenerAndSendOrderedMessages(ignite, grp);
     }
 
     /**
-     * @param async Async message send flag.
+     *
      * @throws Exception If fail.
      */
-    private void clientServerOrderedMessage(boolean async) throws Exception {
+    private void clientServerOrderedMessage() throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forServers();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp, async);
+        registerListenerAndSendOrderedMessages(ignite, grp);
     }
 
     /**
-     * @param async Async message send flag.
+     *
      * @throws Exception If fail.
      */
-    private void clientClientOrderedMessage(boolean async) throws Exception {
+    private void clientClientOrderedMessage() throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp, async);
+        registerListenerAndSendOrderedMessages(ignite, grp);
     }
 
     /**
-     * @param async Async message send flag.
+     *
      * @throws Exception If fail.
      */
-    private void serverClientOrderedMessage(boolean async) throws Exception {
+    private void serverClientOrderedMessage() throws Exception {
         Ignite ignite = grid(SERVER_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp, async);
+        registerListenerAndSendOrderedMessages(ignite, grp);
     }
 
     /**
      * @param ignite Ignite.
      * @param grp Cluster group.
-     * @param async Async message send flag.
      * @throws Exception If fail.
      */
-    private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception {
+    private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception {
         int messages = MSGS;
 
         LATCH = new CountDownLatch(grp.nodes().size() * messages);
@@ -520,12 +403,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
         UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener());
 
         try {
-            for (int i=0; i < messages; i++){
-                if (async)
-                    ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, value(i), 2000);
-                else
-                    ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
-            }
+            for (int i=0; i < messages; i++)
+                ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
 
             assertTrue(LATCH.await(10, TimeUnit.SECONDS));
 
@@ -540,13 +419,9 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
      * @param nodeSnd Sender Ignite node.
      * @param grp Cluster group.
      * @param msg Message.
-     * @param async Async message send flag.
      */
-    private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async) {
-        if (async)
-            nodeSnd.message(grp).withAsync().send(MESSAGE_TOPIC, msg);
-        else
-            nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
+    private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg) {
+        nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index a166c3d..5a0dfa2 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -24,7 +24,6 @@ import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,20 +36,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
-import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.resources.IgniteInstanceResource;;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -204,7 +198,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
 
         discoSpi.setIpFinder(ipFinder);
 
@@ -950,7 +944,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
      * @throws Exception If error occurs.
      */
     public void testSendMessageWithExternalClassLoader() throws Exception {
-        URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))};
+        URL[] urls = new URL[] { new URL(GridTestProperties.getProperty("p2p.uri.cls")) };
 
         ClassLoader extLdr = new URLClassLoader(urls);
 
@@ -1034,8 +1028,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
     public void testAsync() throws Exception {
         final AtomicInteger msgCnt = new AtomicInteger();
 
-        TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi();
-
         assertFalse(ignite2.message().isAsync());
 
         final IgniteMessaging msg = ignite2.message().withAsync();
@@ -1052,8 +1044,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
             }
         }, IllegalStateException.class, null);
 
-        discoSpi.blockCustomEvent();
-
         final String topic = "topic";
 
         UUID id = msg.remoteListen(topic, new P2<UUID, Object>() {
@@ -1069,15 +1059,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         Assert.assertNull(id);
 
-        IgniteFuture<UUID> starFut = msg.future();
-
-        Assert.assertNotNull(starFut);
-
-        U.sleep(500);
+        IgniteFuture<UUID> fut = msg.future();
 
-        Assert.assertFalse(starFut.isDone());
-
-        discoSpi.stopBlock();
+        Assert.assertNotNull(fut);
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -1087,14 +1071,10 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
             }
         }, IllegalStateException.class, null);
 
-        id = starFut.get();
+        id = fut.get();
 
         Assert.assertNotNull(id);
 
-        Assert.assertTrue(starFut.isDone());
-
-        discoSpi.blockCustomEvent();
-
         message(ignite1.cluster().forRemotes()).send(topic, "msg1");
 
         GridTestUtils.waitForCondition(new PA() {
@@ -1119,16 +1099,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
             }
         }, IllegalStateException.class, null);
 
-        U.sleep(500);
-
-        Assert.assertFalse(stopFut.isDone());
-
-        discoSpi.stopBlock();
-
         stopFut.get();
 
-        Assert.assertTrue(stopFut.isDone());
-
         message(ignite1.cluster().forRemotes()).send(topic, "msg2");
 
         U.sleep(1000);
@@ -1137,80 +1109,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
     }
 
     /**
-     *
-     */
-    static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
-        /** */
-        private boolean blockCustomEvt;
-
-        /** */
-        private final Object mux = new Object();
-
-        /** */
-        private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>();
-
-        /** {@inheritDoc} */
-        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
-            synchronized (mux) {
-                if (blockCustomEvt) {
-                    DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate");
-                    if (msg0 instanceof StopRoutineDiscoveryMessage || msg0 instanceof StartRoutineDiscoveryMessage) {
-                        log.info("Block custom message: " + msg0);
-                        blockedMsgs.add(msg);
-
-                        mux.notifyAll();
-                    }
-                    return;
-                }
-            }
-
-            super.sendCustomEvent(msg);
-        }
-
-        /**
-         *
-         */
-        public void blockCustomEvent() {
-            synchronized (mux) {
-                assert blockedMsgs.isEmpty() : blockedMsgs;
-
-                blockCustomEvt = true;
-            }
-        }
-
-        /**
-         * @throws InterruptedException If interrupted.
-         */
-        public void waitCustomEvent() throws InterruptedException {
-            synchronized (mux) {
-                while (blockedMsgs.isEmpty())
-                    mux.wait();
-            }
-        }
-
-        /**
-         *
-         */
-        public void stopBlock() {
-            List<DiscoverySpiCustomMessage> msgs;
-
-            synchronized (this) {
-                msgs = new ArrayList<>(blockedMsgs);
-
-                blockCustomEvt = false;
-
-                blockedMsgs.clear();
-            }
-
-            for (DiscoverySpiCustomMessage msg : msgs) {
-                log.info("Resend blocked message: " + msg);
-
-                super.sendCustomEvent(msg);
-            }
-        }
-    }
-
-    /**
      * Tests that message listener registers only for one oldest node.
      *
      * @throws Exception If an error occurred.
@@ -1254,4 +1152,4 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         assertEquals(1, MSG_CNT.get());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
deleted file mode 100644
index 75e7d22..0000000
--- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.messaging;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.lang.IgniteBiInClosure;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jsr166.ThreadLocalRandom8;
-import org.junit.Assert;
-
-/**
- *
- */
-public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable {
-    /** */
-    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** Threads number for multi-thread tests. */
-    private static final int THREADS = 10;
-
-    /** */
-    private final String TOPIC = "topic";
-
-    /** */
-    private final String msgStr = "message";
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-
-        super.afterTest();
-    }
-
-    /**
-     * Checks if use default mode, local listeners execute in the same thread, 1 node in topology.
-     *
-     * @throws Exception If failed.
-     */
-    public void testSendDefaultMode() throws Exception {
-        Ignite ignite1 = startGrid(1);
-
-        send(ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () {
-            @Override public void apply(String msg, Thread thread) {
-                Assert.assertEquals(Thread.currentThread(), thread);
-                Assert.assertEquals(msgStr, msg);
-            }
-        });
-    }
-
-    /**
-     * Checks if use async mode, local listeners execute in another thread, 1 node in topology.
-     *
-     * @throws Exception If failed.
-     */
-    public void testSendAsyncMode() throws Exception {
-        Ignite ignite1 = startGrid(1);
-
-        send(ignite1.message().withAsync(), msgStr,  new IgniteBiInClosure<String, Thread> () {
-            @Override public void apply(String msg, Thread thread) {
-                Assert.assertTrue(!Thread.currentThread().equals(thread));
-                Assert.assertEquals(msgStr, msg);
-            }
-        });
-    }
-
-    /**
-     * Checks if use default mode, local listeners execute in the same thread, 2 nodes in topology.
-     *
-     * @throws Exception If failed.
-     */
-    public void testSendDefaultMode2Nodes() throws Exception {
-        Ignite ignite1 = startGrid(1);
-        Ignite ignite2 = startGrid(2);
-
-        sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () {
-            @Override public  void apply(String msg, Thread thread) {
-                Assert.assertEquals(Thread.currentThread(), thread);
-                Assert.assertEquals(msgStr, msg);
-            }
-        });
-    }
-
-    /**
-     * Checks if use async mode, local listeners execute in another thread, 2 nodes in topology.
-     *
-     * @throws Exception If failed.
-     */
-    public void testSendAsyncMode2Node() throws Exception {
-        Ignite ignite1 = startGrid(1);
-        Ignite ignite2 = startGrid(2);
-
-        sendWith2Nodes(ignite2, ignite1.message().withAsync(), msgStr,  new IgniteBiInClosure<String, Thread> () {
-            @Override public  void apply(String msg, Thread thread) {
-                Assert.assertTrue(!Thread.currentThread().equals(thread));
-                Assert.assertEquals(msgStr, msg);
-            }
-        });
-    }
-
-    /**
-     * Checks that sendOrdered works in thread pool, 1 node in topology.
-     *
-     * @throws Exception If failed.
-     */
-    public void testSendOrderedDefaultMode() throws Exception {
-        Ignite ignite1 = startGrid(1);
-
-        final List<String> msgs = orderedMessages();
-
-        sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>,  List<Thread>> () {
-            @Override public void apply(List<String> received, List<Thread> threads) {
-                assertFalse(threads.contains(Thread.currentThread()));
-                assertTrue(msgs.equals(received));
-            }
-        });
-    }
-
-    /**
-     * Checks that sendOrdered work in thread pool, 1 node in topology.
-     *
-     * @throws Exception If failed.
-     */
-    public void testSendOrderedAsyncMode() throws Exception {
-        Ignite ignite1 = startGrid(1);
-
-        final List<String> msgs = orderedMessages();
-
-        sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>,  List<Thread>> () {
-            @Override public void apply(List<String> received, List<Thread> threads) {
-                assertFalse(threads.contains(Thread.currentThread()));
-                assertTrue(msgs.equals(received));
-            }
-        });
-    }
-
-    /**
-     * Checks that sendOrdered work in thread pool, 2 nodes in topology.
-     *
-     * @throws Exception If failed.
-     */
-    public void testSendOrderedDefaultMode2Node() throws Exception {
-        Ignite ignite1 = startGrid(1);
-        Ignite ignite2 = startGrid(2);
-
-        final List<String> msgs = orderedMessages();
-
-        sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
-            @Override public void apply(List<String> received, List<Thread> threads) {
-                assertFalse(threads.contains(Thread.currentThread()));
-                assertTrue(msgs.equals(received));
-            }
-        });
-    }
-
-    /**
-     * Checks that sendOrdered work in thread pool, 2 nodes in topology.
-     *
-     * @throws Exception If failed.
-     */
-    public void testSendOrderedAsyncMode2Node() throws Exception {
-        Ignite ignite1 = startGrid(1);
-        Ignite ignite2 = startGrid(2);
-
-        final List<String> msgs = orderedMessages();
-
-        sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
-            @Override public void apply(List<String> received, List<Thread> threads) {
-                assertFalse(threads.contains(Thread.currentThread()));
-                assertTrue(msgs.equals(received));
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSendOrderedDefaultModeMultiThreads() throws Exception {
-        Ignite ignite = startGrid(1);
-
-        sendOrderedMultiThreads(ignite.message());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSendOrderedAsyncModeMultiThreads() throws Exception {
-        Ignite ignite = startGrid(1);
-
-        sendOrderedMultiThreads(ignite.message().withAsync());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception {
-        Ignite ignite1 = startGrid(1);
-        Ignite ignite2 = startGrid(2);
-
-        sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception {
-        Ignite ignite1 = startGrid(1);
-        Ignite ignite2 = startGrid(2);
-
-        sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message().withAsync());
-    }
-
-    /**
-     * @param ignite2 Second node.
-     * @param ignMsg IgniteMessage.
-     * @throws Exception If failed.
-     */
-    private void sendOrderedMultiThreadsWith2Node(
-            final Ignite ignite2,
-            final IgniteMessaging ignMsg
-    ) throws Exception {
-        final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
-        final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
-
-        final List<String> msgs = orderedMessages();
-
-        sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs);
-
-    }
-
-    /**
-     * @param ignMsg IgniteMessaging.
-     * @throws Exception If failed.
-     */
-    private void sendOrderedMultiThreads(
-            final IgniteMessaging ignMsg
-    ) throws Exception {
-        final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
-        final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
-
-        final List<String> msgs = orderedMessages();
-
-        sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
-    }
-
-    /**
-     * @param ignite2 Second node.
-     * @param ignMsg Ignite for send message.
-     * @param expMsg Expected messages map.
-     * @param actlMsg Actual message map.
-     * @param msgs List of messages.
-     * @throws Exception If failed.
-     */
-    private void sendOrderedMultiThreadsWith2Node(
-            final Ignite ignite2,
-            final IgniteMessaging ignMsg,
-            final ConcurrentMap<String, List<String>> expMsg,
-            final ConcurrentMap<String, List<String>> actlMsg,
-            final List<String> msgs
-    ) throws Exception {
-        final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
-
-        final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap();
-
-        ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
-            @Override public boolean apply(UUID uuid, Message msg) {
-                actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
-
-                actlMsgNode2.get(msg.threadName).add(msg.msg);
-
-                latch.countDown();
-
-                return true;
-            }
-        });
-
-        sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
-
-        latch.await();
-
-        assertEquals(expMsg.size(), actlMsgNode2.size());
-
-        for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
-            assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue()));
-    }
-
-    /**
-     * @param ignMsg Ignite for send message.
-     * @param expMsg Expected messages map.
-     * @param actlMsg Actual message map.
-     * @param msgs List of messages.
-     * @throws Exception If failed.
-     */
-    private void sendOrderedMultiThreads(
-            final IgniteMessaging ignMsg,
-            final ConcurrentMap<String, List<String>> expMsg,
-            final ConcurrentMap<String, List<String>> actlMsg,
-            final List<String> msgs
-    ) throws Exception {
-        final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
-
-        ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
-            @Override public boolean apply(UUID uuid, Message msg) {
-                actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
-
-                actlMsg.get(msg.threadName).add(msg.msg);
-
-                latch.countDown();
-
-                return true;
-            }
-        });
-
-        for (int i = 0; i < THREADS; i++)
-            new Thread(new Runnable() {
-                @Override public void run() {
-                    String thdName = Thread.currentThread().getName();
-
-                    List<String> exp = Lists.newArrayList();
-
-                    expMsg.put(thdName, exp);
-
-                    for (String msg : msgs) {
-                        exp.add(msg);
-
-                        ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000);
-                    }
-
-                }
-            }).start();
-
-        latch.await();
-
-        assertEquals(expMsg.size(), actlMsg.size());
-
-        for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
-            assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue()));
-    }
-
-    /**
-     * @param ignite2 Second node.
-     * @param igniteMsg Ignite message.
-     * @param msgStr    Message string.
-     * @param cls       Callback for compare result.
-     * @throws Exception If failed.
-     */
-    private void sendWith2Nodes(
-            final Ignite ignite2,
-            final IgniteMessaging igniteMsg,
-            final String msgStr,
-            final IgniteBiInClosure<String, Thread>  cls
-    ) throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
-            @Override public boolean apply(UUID uuid, String msg) {
-                Assert.assertEquals(msgStr, msg);
-
-                latch.countDown();
-
-                return true;
-            }
-        });
-
-        send(igniteMsg, msgStr, cls);
-
-        latch.await();
-    }
-
-    /**
-     * @param igniteMsg Ignite messaging.
-     * @param msgStr    Message string.
-     * @param cls       Callback for compare result.
-     * @throws Exception If failed.
-     */
-    private void send(
-           final IgniteMessaging igniteMsg,
-           final String msgStr,
-           final IgniteBiInClosure<String, Thread> cls
-    ) throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        final AtomicReference<Thread> thread = new AtomicReference<>();
-        final AtomicReference<String> val = new AtomicReference<>();
-
-        igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
-            @Override public boolean apply(UUID uuid, String msgStr) {
-                thread.set(Thread.currentThread());
-
-                val.set(msgStr);
-
-                latch.countDown();
-
-                return true;
-            }
-        });
-
-        igniteMsg.send(TOPIC, msgStr);
-
-        latch.await();
-
-        cls.apply(val.get(), thread.get());
-    }
-
-    /**
-     * @param ignite2 Second node.
-     * @param igniteMsg Ignite message.
-     * @param msgs messages for send.
-     * @param cls  Callback for compare result.
-     * @throws Exception If failed.
-     */
-    private void sendOrderedWith2Node(
-            final Ignite ignite2,
-            final IgniteMessaging igniteMsg,
-            final List<String> msgs,
-            final IgniteBiInClosure<List<String>, List<Thread>> cls
-    ) throws Exception {
-        final CountDownLatch latch = new CountDownLatch(msgs.size());
-
-        final List<String> received = Lists.newArrayList();
-
-        ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
-            @Override public boolean apply(UUID uuid, String msg) {
-                received.add(msg);
-
-                latch.countDown();
-
-                return true;
-            }
-        });
-
-        sendOrdered(igniteMsg, msgs, cls);
-
-        latch.await();
-
-        assertTrue(msgs.equals(received));
-    }
-
-    /**
-     * @param igniteMsg Ignite message.
-     * @param msgs  messages for send.
-     * @param cls Callback for compare result.
-     * @throws Exception If failed.
-     */
-    private<T> void sendOrdered(
-            final IgniteMessaging igniteMsg,
-            final List<T> msgs,
-            final IgniteBiInClosure<List<T>,List<Thread>> cls
-    ) throws Exception {
-        final CountDownLatch latch = new CountDownLatch(msgs.size());
-
-        final List<T> received = Lists.newArrayList();
-        final List<Thread> threads = Lists.newArrayList();
-
-        for (T msg : msgs)
-            igniteMsg.sendOrdered(TOPIC, msg, 1000);
-
-        igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() {
-            @Override public boolean apply(UUID uuid, T s) {
-                received.add(s);
-
-                threads.add(Thread.currentThread());
-
-                latch.countDown();
-
-                return true;
-            }
-        });
-
-        latch.await();
-
-        cls.apply(received, threads);
-    }
-
-    /**
-     * @return List of ordered messages
-     */
-    private List<String> orderedMessages() {
-        final List<String> msgs = Lists.newArrayList();
-
-        for (int i = 0; i < 1000; i++)
-            msgs.add(String.valueOf(ThreadLocalRandom8.current().nextInt()));
-
-        return msgs;
-    }
-
-    /**
-     *
-     */
-    private static class Message implements Serializable{
-        /** Thread name. */
-        private final String threadName;
-
-        /** Message. */
-        private final String msg;
-
-        /**
-         * @param threadName Thread name.
-         * @param msg Message.
-         */
-        private Message(String threadName, String msg) {
-            this.threadName = threadName;
-            this.msg = msg;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 688edf7..9e20d2a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -56,7 +56,6 @@ import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest;
 import org.apache.ignite.marshaller.MarshallerContextSelfTest;
 import org.apache.ignite.messaging.GridMessagingNoPeerClassLoadingSelfTest;
 import org.apache.ignite.messaging.GridMessagingSelfTest;
-import org.apache.ignite.messaging.IgniteMessagingSendAsyncTest;
 import org.apache.ignite.messaging.IgniteMessagingWithClientTest;
 import org.apache.ignite.plugin.security.SecurityPermissionSetBuilderTest;
 import org.apache.ignite.spi.GridSpiLocalHostInjectionTest;
@@ -102,7 +101,6 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridSelfTest.class));
         suite.addTest(new TestSuite(ClusterGroupHostsSelfTest.class));
         suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
-        suite.addTest(new TestSuite(IgniteMessagingSendAsyncTest.class));
 
         GridTestUtils.addTestIfNeeded(suite, ClusterGroupSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 3db68c4..10f18a6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -147,7 +147,7 @@ public class HadoopShuffle extends HadoopComponent {
         if (msg instanceof Message)
             ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
         else
-            ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
+            ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
     }
 
     /**
@@ -298,4 +298,4 @@ public class HadoopShuffle extends HadoopComponent {
     public GridUnsafeMemory memory() {
         return mem;
     }
-}
\ No newline at end of file
+}


[7/7] ignite git commit: Merge branch 'ignite-1.9.1'

Posted by av...@apache.org.
Merge branch 'ignite-1.9.1'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de2a1588
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de2a1588
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de2a1588

Branch: refs/heads/master
Commit: de2a1588e89379277011a282e5b0b1aade83dc1e
Parents: 684dc7b 8362fe7
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Mar 3 12:33:14 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Mar 3 12:33:14 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 107 +++++++------------
 .../communication/GridIoManagerSelfTest.java    |   2 +-
 .../ant/beautifier/GridJavadocAntTask.java      |   1 +
 parent/pom.xml                                  |   3 +-
 4 files changed, 40 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/de2a1588/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------


[5/7] ignite git commit: IGNITE-4770 Script inside javadoc bottom cause compilation error at 1.8.0_121-b13

Posted by av...@apache.org.
IGNITE-4770 Script inside javadoc bottom cause compilation error at 1.8.0_121-b13


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0be92732
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0be92732
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0be92732

Branch: refs/heads/master
Commit: 0be92732ec4dcc683b1dbeea6c027c4076579847
Parents: bcdba2a
Author: Vyacheslav Daradur <da...@gmail.com>
Authored: Thu Mar 2 15:57:21 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Mar 2 15:57:21 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/tools/ant/beautifier/GridJavadocAntTask.java    | 1 +
 parent/pom.xml                                                    | 3 +--
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0be92732/modules/tools/src/main/java/org/apache/ignite/tools/ant/beautifier/GridJavadocAntTask.java
----------------------------------------------------------------------
diff --git a/modules/tools/src/main/java/org/apache/ignite/tools/ant/beautifier/GridJavadocAntTask.java b/modules/tools/src/main/java/org/apache/ignite/tools/ant/beautifier/GridJavadocAntTask.java
index e654231..d34b4e4 100644
--- a/modules/tools/src/main/java/org/apache/ignite/tools/ant/beautifier/GridJavadocAntTask.java
+++ b/modules/tools/src/main/java/org/apache/ignite/tools/ant/beautifier/GridJavadocAntTask.java
@@ -282,6 +282,7 @@ public class GridJavadocAntTask extends MatchingTask {
                             "<script type='text/javascript'>" +
                                 "SyntaxHighlighter.all();" +
                                 "dp.SyntaxHighlighter.HighlightAll('code');" +
+                                "!function(d,s,id){var js,fjs=d.getElementsByTagName(s)[0],p=/^http:/.test(d.location)?'http':'https';if(!d.getElementById(id)){js=d.createElement(s);js.id=id;js.src=p+'://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js,fjs);}}(document, 'script', 'twitter-wjs');" +
                             "</script>\n" +
                             "</body>\n");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0be92732/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index a17732d..32da7ec 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -469,7 +469,6 @@
                                         </td>
                                         <td width="100%" align="right" valign="center">
                                             <a href="https://twitter.com/ApacheIgnite" class="twitter-follow-button" data-show-count="false" data-size="large">Follow @ApacheIgnite</a>
-                                            <script>!function(d,s,id){var js,fjs=d.getElementsByTagName(s)[0],p=/^http:/.test(d.location)?'http':'https';if(!d.getElementById(id)){js=d.createElement(s);js.id=id;js.src=p+'://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js,fjs);}}(document, 'script', 'twitter-wjs');</script>
                                         </td>
                                     </tr>
                                     <tr>
@@ -937,7 +936,7 @@
                 <jdk>[1.8,)</jdk>
             </activation>
             <properties>
-                <javadoc.opts>-Xdoclint:none --allow-script-in-comments</javadoc.opts>
+                <javadoc.opts>-Xdoclint:none</javadoc.opts>
             </properties>
             <build>
                 <plugins>


[3/7] ignite git commit: ignite-3727 Resotred fix and fixed issue with wrong method called in GridIoManager.

Posted by av...@apache.org.
ignite-3727 Resotred fix and fixed issue with wrong method called in GridIoManager.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59ed1d7e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59ed1d7e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59ed1d7e

Branch: refs/heads/master
Commit: 59ed1d7ec08a1bb827e7354bb5dca80b86944933
Parents: 9d811f1
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 13:28:20 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 13:28:20 2017 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteMessaging.java |  13 +-
 .../ignite/internal/IgniteMessagingImpl.java    |   8 +-
 .../internal/managers/GridManagerAdapter.java   |   2 +-
 .../managers/communication/GridIoManager.java   | 130 ++---
 .../communication/GridIoManagerSelfTest.java    |   6 +-
 ...niteMessagingConfigVariationFullApiTest.java | 195 +++++--
 .../ignite/messaging/GridMessagingSelfTest.java | 116 +++-
 .../messaging/IgniteMessagingSendAsyncTest.java | 544 +++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 +
 .../hadoop/shuffle/HadoopShuffle.java           |   4 +-
 10 files changed, 890 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
index ff52ed8..e64ded5 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
@@ -77,6 +77,10 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
 
     /**
      * Sends given message with specified topic to the nodes in the underlying cluster group.
+     * <p>
+     * By default all local listeners will be executed in the calling thread, or if you use
+     * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's
+     * responsibility to implement back-pressure and limit number of concurrently executed async messages).
      *
      * @param topic Topic to send to, {@code null} for default topic.
      * @param msg Message to send.
@@ -87,6 +91,10 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
 
     /**
      * Sends given messages with the specified topic to the nodes in the underlying cluster group.
+     * <p>
+     * By default all local listeners will be executed in the calling thread, or if you use
+     * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's
+     * responsibility to implement back-pressure and limit number of concurrently executed async messages).
      *
      * @param topic Topic to send to, {@code null} for default topic.
      * @param msgs Messages to send. Order of the sending is undefined. If the method produces
@@ -99,7 +107,8 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
     /**
      * Sends given message with specified topic to the nodes in the underlying cluster group. Messages sent with
      * this method will arrive in the same order they were sent. Note that if a topic is used
-     * for ordered messages, then it cannot be reused for non-ordered messages.
+     * for ordered messages, then it cannot be reused for non-ordered messages. Note that local listeners
+     * are always executed in public thread pool, no matter default or {@link #withAsync()} mode is used.
      * <p>
      * The {@code timeout} parameter specifies how long an out-of-order message will stay in a queue,
      * waiting for messages that are ordered ahead of it to arrive. If timeout expires, then all ordered
@@ -162,4 +171,4 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
 
     /** {@inheritDoc} */
     @Override IgniteMessaging withAsync();
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index 6b33aa5..541fad4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -86,7 +86,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
             if (snapshot.isEmpty())
                 throw U.emptyTopologyException();
 
-            ctx.io().sendUserMessage(snapshot, msg, topic, false, 0);
+            ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -111,7 +111,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
             for (Object msg : msgs) {
                 A.notNull(msg, "msg");
 
-                ctx.io().sendUserMessage(snapshot, msg, topic, false, 0);
+                ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync());
             }
         }
         catch (IgniteCheckedException e) {
@@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
             if (timeout == 0)
                 timeout = ctx.config().getNetworkTimeout();
 
-            ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout);
+            ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, false);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -254,4 +254,4 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
     protected Object readResolve() throws ObjectStreamException {
         return prj.message();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 584cc56..5992eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -390,7 +390,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                             if (msg instanceof Message)
                                 ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL);
                             else
-                                ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0);
+                                ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false);
                         }
                         catch (IgniteCheckedException e) {
                             throw unwrapException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 7ef7bc0..0b7c790 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -188,6 +188,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** */
     private final AtomicLong ioTestId = new AtomicLong();
 
+    /** No-op runnable. */
+    private static final IgniteRunnable NOOP = new IgniteRunnable() {
+        @Override public void run() {
+            // No-op.
+        }
+    };
+
     /**
      * @param ctx Grid kernal context.
      */
@@ -1237,6 +1244,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
      * @param ackC Ack closure.
+     * @param async If {@code true} message for local node will be processed in pool, otherwise in current thread.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private void send(
@@ -1248,11 +1256,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         boolean ordered,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackC
+        IgniteInClosure<IgniteException> ackC,
+        boolean async
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
         assert msg != null;
+        assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
 
         GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
 
@@ -1266,6 +1276,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
+            else if (async)
+                processRegularMessage(locNodeId, ioMsg, plc, NOOP);
             else
                 processRegularMessage0(ioMsg, locNodeId);
 
@@ -1323,7 +1335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1335,7 +1347,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, null);
+        send(node, topic, -1, msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1347,7 +1359,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1360,7 +1372,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topicOrd, msg, plc, false, 0, false, null);
+        send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1382,34 +1394,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
-    }
-
-    /**
-     * @param nodeId Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param timeout Timeout to keep a message on receiving queue.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void sendOrderedMessage(
-        UUID nodeId,
-        Object topic,
-        Message msg,
-        byte plc,
-        long timeout,
-        boolean skipOnTimeout
-    ) throws IgniteCheckedException {
-        assert timeout > 0 || skipOnTimeout;
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
     }
 
     /**
@@ -1422,7 +1407,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
         IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
     }
 
     /**
@@ -1458,7 +1443,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackC);
+        send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
     }
 
     /**
@@ -1514,10 +1499,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
     }
 
-     /**
+    /**
      * Sends a peer deployable user message.
      *
      * @param nodes Destination nodes.
@@ -1525,7 +1510,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
-        sendUserMessage(nodes, msg, null, false, 0);
+        sendUserMessage(nodes, msg, null, false, 0, false);
     }
 
     /**
@@ -1536,11 +1521,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Message topic to use.
      * @param ordered Is message ordered?
      * @param timeout Message timeout in milliseconds for ordered messages.
+     * @param async Async flag.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @SuppressWarnings("ConstantConditions")
     public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg,
-        @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException {
+        @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException {
         boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
 
         byte[] serMsg = null;
@@ -1584,8 +1570,18 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         if (ordered)
             sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
-        else if (loc)
-            send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+        else if (loc) {
+            send(F.first(nodes),
+                TOPIC_COMM_USER,
+                TOPIC_COMM_USER.ordinal(),
+                ioMsg,
+                PUBLIC_POOL,
+                false,
+                0,
+                false,
+                null,
+                async);
+        }
         else {
             ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
 
@@ -1594,10 +1590,21 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             if (!rmtNodes.isEmpty())
                 send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
 
-            // Will call local listeners in current thread synchronously, so must go the last
+            // Will call local listeners in current thread synchronously or through pool,
+            // depending async flag, so must go the last
             // to allow remote nodes execute the requested operation in parallel.
-            if (locNode != null)
-                send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+            if (locNode != null) {
+                send(locNode,
+                    TOPIC_COMM_USER,
+                    TOPIC_COMM_USER.ordinal(),
+                    ioMsg,
+                    PUBLIC_POOL,
+                    false,
+                    0,
+                    false,
+                    null,
+                    async);
+            }
         }
     }
 
@@ -1639,35 +1646,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param nodeId Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param timeout Timeout to keep a message on receiving queue.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackC Ack closure.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void sendOrderedMessage(
-        UUID nodeId,
-        Object topic,
-        Message msg,
-        byte plc,
-        long timeout,
-        boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackC
-    ) throws IgniteCheckedException {
-        assert timeout > 0 || skipOnTimeout;
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
-    }
-
-    /**
      * @param nodes Destination nodes.
      * @param topic Topic to send the message to.
      * @param topicOrd Topic ordinal value.
@@ -1701,7 +1679,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             // messages to one node vs. many.
             if (!nodes.isEmpty()) {
                 for (ClusterNode node : nodes)
-                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
+                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false);
             }
             else if (log.isDebugEnabled())
                 log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
@@ -1929,8 +1907,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (rmv && log.isDebugEnabled())
             log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']');
 
-        if (lsnr instanceof ArrayListener)
-        {
+        if (lsnr instanceof ArrayListener) {
             for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr)
                 closeListener(childLsnr);
         }
@@ -1942,6 +1919,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
     /**
      * Closes a listener, if applicable.
+     *
      * @param lsnr Listener.
      */
     private void closeListener(GridMessageListener lsnr) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index 2039d81..2bc1398 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -145,7 +145,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
 
         try {
-            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L);
+            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L, false);
         }
         catch (IgniteCheckedException ignored) {
             // No-op. We are using mocks so real sending is impossible.
@@ -169,7 +169,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
 
         try {
-            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L);
+            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L, false);
         }
         catch (Exception ignored) {
             // No-op. We are using mocks so real sending is impossible.
@@ -257,4 +257,4 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
             return 0;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
index 31b0663..49aab10 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
@@ -58,7 +58,18 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     public void testLocalServer() throws Exception {
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                localServerInternal();
+                localServerInternal(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalServerAsync() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                localServerInternal(true);
             }
         });
     }
@@ -83,7 +94,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                serverClientMessage();
+                serverClientMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerClientMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                serverClientMessage(true);
             }
         });
     }
@@ -97,7 +122,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientClientMessage();
+                clientClientMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientClientMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                clientClientMessage(true);
             }
         });
     }
@@ -111,7 +150,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientServerMessage();
+                clientServerMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientServerMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                clientServerMessage(true);
             }
         });
     }
@@ -133,7 +186,18 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     public void testOrderedMessage() throws Exception {
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                orderedMessage();
+                orderedMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOrderedMessageAsync() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                orderedMessage(true);
             }
         });
     }
@@ -147,7 +211,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientServerOrderedMessage();
+                clientServerOrderedMessage(false);
             }
         });
     }
@@ -155,13 +219,42 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     /**
      * @throws Exception If failed.
      */
+    public void testClientServerOrderedMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                clientServerOrderedMessage(true);
+            }
+        });
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientClientOrderedMessage() throws Exception {
         if (!testsCfg.withClients())
             return;
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientClientOrderedMessage();
+                clientClientOrderedMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientClientOrderedMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                clientClientOrderedMessage(true);
             }
         });
     }
@@ -175,16 +268,32 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                serverClientOrderedMessage();
+                serverClientOrderedMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerClientOrderedMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                serverClientOrderedMessage(true);
             }
         });
     }
 
     /**
      * Single server test.
+     *
+     * @param async Async message send flag.
      * @throws Exception If failed.
      */
-    private void localServerInternal() throws Exception {
+    private void localServerInternal(boolean async) throws Exception {
         int messages = MSGS;
 
         Ignite ignite = grid(SERVER_NODE_IDX);
@@ -197,7 +306,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         try {
             for (int i = 0; i < messages; i++)
-                sendMessage(ignite, grp, value(i));
+                sendMessage(ignite, grp, value(i), async);
 
             assertTrue(LATCH.await(10, TimeUnit.SECONDS));
 
@@ -238,52 +347,59 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
     /**
      * Server sends a message and client receives it.
+     *
+     * @param async Async message send flag.
      * @throws Exception If failed.
      */
-    private void serverClientMessage() throws Exception {
+    private void serverClientMessage(boolean async) throws Exception {
         Ignite ignite = grid(SERVER_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendMessages(ignite, grp);
+        registerListenerAndSendMessages(ignite, grp, async);
     }
 
     /**
      * Client sends a message and client receives it.
+     *
+     * @param async Async message send flag.
      * @throws Exception If failed.
      */
-    private void clientClientMessage() throws Exception {
+    private void clientClientMessage(boolean async) throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendMessages(ignite, grp);
+        registerListenerAndSendMessages(ignite, grp, async);
     }
 
     /**
      * Client sends a message and client receives it.
+     *
+     * @param async Async message send flag.
      * @throws Exception If failed.
      */
-    private void clientServerMessage() throws Exception {
+    private void clientServerMessage(boolean async) throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forServers();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendMessages(ignite, grp);
+        registerListenerAndSendMessages(ignite, grp, async);
     }
 
     /**
      * @param ignite Ignite.
      * @param grp Cluster group.
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
-    private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp) throws Exception {
+    private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception {
         int messages = MSGS;
 
         LATCH = new CountDownLatch(grp.nodes().size() * messages);
@@ -292,7 +408,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
         try {
             for (int i = 0; i < messages; i++)
-                sendMessage(ignite, grp, value(i));
+                sendMessage(ignite, grp, value(i), async);
 
             assertTrue(LATCH.await(10, TimeUnit.SECONDS));
 
@@ -335,67 +451,68 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     }
 
     /**
-     *
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
-    private void orderedMessage() throws Exception {
+    private void orderedMessage(boolean async) throws Exception {
         Ignite ignite = grid(SERVER_NODE_IDX);
 
         ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp);
+        registerListenerAndSendOrderedMessages(ignite, grp, async);
     }
 
     /**
-     *
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
-    private void clientServerOrderedMessage() throws Exception {
+    private void clientServerOrderedMessage(boolean async) throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forServers();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp);
+        registerListenerAndSendOrderedMessages(ignite, grp, async);
     }
 
     /**
-     *
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
-    private void clientClientOrderedMessage() throws Exception {
+    private void clientClientOrderedMessage(boolean async) throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp);
+        registerListenerAndSendOrderedMessages(ignite, grp, async);
     }
 
     /**
-     *
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
-    private void serverClientOrderedMessage() throws Exception {
+    private void serverClientOrderedMessage(boolean async) throws Exception {
         Ignite ignite = grid(SERVER_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp);
+        registerListenerAndSendOrderedMessages(ignite, grp, async);
     }
 
     /**
      * @param ignite Ignite.
      * @param grp Cluster group.
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
-    private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception {
+    private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception {
         int messages = MSGS;
 
         LATCH = new CountDownLatch(grp.nodes().size() * messages);
@@ -403,8 +520,12 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
         UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener());
 
         try {
-            for (int i=0; i < messages; i++)
-                ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
+            for (int i=0; i < messages; i++){
+                if (async)
+                    ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, value(i), 2000);
+                else
+                    ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
+            }
 
             assertTrue(LATCH.await(10, TimeUnit.SECONDS));
 
@@ -419,9 +540,13 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
      * @param nodeSnd Sender Ignite node.
      * @param grp Cluster group.
      * @param msg Message.
+     * @param async Async message send flag.
      */
-    private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg) {
-        nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
+    private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async) {
+        if (async)
+            nodeSnd.message(grp).withAsync().send(MESSAGE_TOPIC, msg);
+        else
+            nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index 5a0dfa2..a166c3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,15 +37,20 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
+import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.IgniteInstanceResource;;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -198,7 +204,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
 
         discoSpi.setIpFinder(ipFinder);
 
@@ -944,7 +950,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
      * @throws Exception If error occurs.
      */
     public void testSendMessageWithExternalClassLoader() throws Exception {
-        URL[] urls = new URL[] { new URL(GridTestProperties.getProperty("p2p.uri.cls")) };
+        URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))};
 
         ClassLoader extLdr = new URLClassLoader(urls);
 
@@ -1028,6 +1034,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
     public void testAsync() throws Exception {
         final AtomicInteger msgCnt = new AtomicInteger();
 
+        TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi();
+
         assertFalse(ignite2.message().isAsync());
 
         final IgniteMessaging msg = ignite2.message().withAsync();
@@ -1044,6 +1052,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
             }
         }, IllegalStateException.class, null);
 
+        discoSpi.blockCustomEvent();
+
         final String topic = "topic";
 
         UUID id = msg.remoteListen(topic, new P2<UUID, Object>() {
@@ -1059,9 +1069,15 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         Assert.assertNull(id);
 
-        IgniteFuture<UUID> fut = msg.future();
+        IgniteFuture<UUID> starFut = msg.future();
+
+        Assert.assertNotNull(starFut);
+
+        U.sleep(500);
 
-        Assert.assertNotNull(fut);
+        Assert.assertFalse(starFut.isDone());
+
+        discoSpi.stopBlock();
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -1071,10 +1087,14 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
             }
         }, IllegalStateException.class, null);
 
-        id = fut.get();
+        id = starFut.get();
 
         Assert.assertNotNull(id);
 
+        Assert.assertTrue(starFut.isDone());
+
+        discoSpi.blockCustomEvent();
+
         message(ignite1.cluster().forRemotes()).send(topic, "msg1");
 
         GridTestUtils.waitForCondition(new PA() {
@@ -1099,8 +1119,16 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
             }
         }, IllegalStateException.class, null);
 
+        U.sleep(500);
+
+        Assert.assertFalse(stopFut.isDone());
+
+        discoSpi.stopBlock();
+
         stopFut.get();
 
+        Assert.assertTrue(stopFut.isDone());
+
         message(ignite1.cluster().forRemotes()).send(topic, "msg2");
 
         U.sleep(1000);
@@ -1109,6 +1137,80 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
     }
 
     /**
+     *
+     */
+    static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private boolean blockCustomEvt;
+
+        /** */
+        private final Object mux = new Object();
+
+        /** */
+        private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+            synchronized (mux) {
+                if (blockCustomEvt) {
+                    DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate");
+                    if (msg0 instanceof StopRoutineDiscoveryMessage || msg0 instanceof StartRoutineDiscoveryMessage) {
+                        log.info("Block custom message: " + msg0);
+                        blockedMsgs.add(msg);
+
+                        mux.notifyAll();
+                    }
+                    return;
+                }
+            }
+
+            super.sendCustomEvent(msg);
+        }
+
+        /**
+         *
+         */
+        public void blockCustomEvent() {
+            synchronized (mux) {
+                assert blockedMsgs.isEmpty() : blockedMsgs;
+
+                blockCustomEvt = true;
+            }
+        }
+
+        /**
+         * @throws InterruptedException If interrupted.
+         */
+        public void waitCustomEvent() throws InterruptedException {
+            synchronized (mux) {
+                while (blockedMsgs.isEmpty())
+                    mux.wait();
+            }
+        }
+
+        /**
+         *
+         */
+        public void stopBlock() {
+            List<DiscoverySpiCustomMessage> msgs;
+
+            synchronized (this) {
+                msgs = new ArrayList<>(blockedMsgs);
+
+                blockCustomEvt = false;
+
+                blockedMsgs.clear();
+            }
+
+            for (DiscoverySpiCustomMessage msg : msgs) {
+                log.info("Resend blocked message: " + msg);
+
+                super.sendCustomEvent(msg);
+            }
+        }
+    }
+
+    /**
      * Tests that message listener registers only for one oldest node.
      *
      * @throws Exception If an error occurred.
@@ -1152,4 +1254,4 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         assertEquals(1, MSG_CNT.get());
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
new file mode 100644
index 0000000..75e7d22
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.messaging;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
+import org.junit.Assert;
+
+/**
+ *
+ */
+public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Threads number for multi-thread tests. */
+    private static final int THREADS = 10;
+
+    /** */
+    private final String TOPIC = "topic";
+
+    /** */
+    private final String msgStr = "message";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * Checks if use default mode, local listeners execute in the same thread, 1 node in topology.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSendDefaultMode() throws Exception {
+        Ignite ignite1 = startGrid(1);
+
+        send(ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () {
+            @Override public void apply(String msg, Thread thread) {
+                Assert.assertEquals(Thread.currentThread(), thread);
+                Assert.assertEquals(msgStr, msg);
+            }
+        });
+    }
+
+    /**
+     * Checks if use async mode, local listeners execute in another thread, 1 node in topology.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSendAsyncMode() throws Exception {
+        Ignite ignite1 = startGrid(1);
+
+        send(ignite1.message().withAsync(), msgStr,  new IgniteBiInClosure<String, Thread> () {
+            @Override public void apply(String msg, Thread thread) {
+                Assert.assertTrue(!Thread.currentThread().equals(thread));
+                Assert.assertEquals(msgStr, msg);
+            }
+        });
+    }
+
+    /**
+     * Checks if use default mode, local listeners execute in the same thread, 2 nodes in topology.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSendDefaultMode2Nodes() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () {
+            @Override public  void apply(String msg, Thread thread) {
+                Assert.assertEquals(Thread.currentThread(), thread);
+                Assert.assertEquals(msgStr, msg);
+            }
+        });
+    }
+
+    /**
+     * Checks if use async mode, local listeners execute in another thread, 2 nodes in topology.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSendAsyncMode2Node() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        sendWith2Nodes(ignite2, ignite1.message().withAsync(), msgStr,  new IgniteBiInClosure<String, Thread> () {
+            @Override public  void apply(String msg, Thread thread) {
+                Assert.assertTrue(!Thread.currentThread().equals(thread));
+                Assert.assertEquals(msgStr, msg);
+            }
+        });
+    }
+
+    /**
+     * Checks that sendOrdered works in thread pool, 1 node in topology.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSendOrderedDefaultMode() throws Exception {
+        Ignite ignite1 = startGrid(1);
+
+        final List<String> msgs = orderedMessages();
+
+        sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>,  List<Thread>> () {
+            @Override public void apply(List<String> received, List<Thread> threads) {
+                assertFalse(threads.contains(Thread.currentThread()));
+                assertTrue(msgs.equals(received));
+            }
+        });
+    }
+
+    /**
+     * Checks that sendOrdered work in thread pool, 1 node in topology.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSendOrderedAsyncMode() throws Exception {
+        Ignite ignite1 = startGrid(1);
+
+        final List<String> msgs = orderedMessages();
+
+        sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>,  List<Thread>> () {
+            @Override public void apply(List<String> received, List<Thread> threads) {
+                assertFalse(threads.contains(Thread.currentThread()));
+                assertTrue(msgs.equals(received));
+            }
+        });
+    }
+
+    /**
+     * Checks that sendOrdered work in thread pool, 2 nodes in topology.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSendOrderedDefaultMode2Node() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        final List<String> msgs = orderedMessages();
+
+        sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
+            @Override public void apply(List<String> received, List<Thread> threads) {
+                assertFalse(threads.contains(Thread.currentThread()));
+                assertTrue(msgs.equals(received));
+            }
+        });
+    }
+
+    /**
+     * Checks that sendOrdered work in thread pool, 2 nodes in topology.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSendOrderedAsyncMode2Node() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        final List<String> msgs = orderedMessages();
+
+        sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
+            @Override public void apply(List<String> received, List<Thread> threads) {
+                assertFalse(threads.contains(Thread.currentThread()));
+                assertTrue(msgs.equals(received));
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSendOrderedDefaultModeMultiThreads() throws Exception {
+        Ignite ignite = startGrid(1);
+
+        sendOrderedMultiThreads(ignite.message());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSendOrderedAsyncModeMultiThreads() throws Exception {
+        Ignite ignite = startGrid(1);
+
+        sendOrderedMultiThreads(ignite.message().withAsync());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message().withAsync());
+    }
+
+    /**
+     * @param ignite2 Second node.
+     * @param ignMsg IgniteMessage.
+     * @throws Exception If failed.
+     */
+    private void sendOrderedMultiThreadsWith2Node(
+            final Ignite ignite2,
+            final IgniteMessaging ignMsg
+    ) throws Exception {
+        final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
+        final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
+
+        final List<String> msgs = orderedMessages();
+
+        sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs);
+
+    }
+
+    /**
+     * @param ignMsg IgniteMessaging.
+     * @throws Exception If failed.
+     */
+    private void sendOrderedMultiThreads(
+            final IgniteMessaging ignMsg
+    ) throws Exception {
+        final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
+        final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
+
+        final List<String> msgs = orderedMessages();
+
+        sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
+    }
+
+    /**
+     * @param ignite2 Second node.
+     * @param ignMsg Ignite for send message.
+     * @param expMsg Expected messages map.
+     * @param actlMsg Actual message map.
+     * @param msgs List of messages.
+     * @throws Exception If failed.
+     */
+    private void sendOrderedMultiThreadsWith2Node(
+            final Ignite ignite2,
+            final IgniteMessaging ignMsg,
+            final ConcurrentMap<String, List<String>> expMsg,
+            final ConcurrentMap<String, List<String>> actlMsg,
+            final List<String> msgs
+    ) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
+
+        final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap();
+
+        ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
+            @Override public boolean apply(UUID uuid, Message msg) {
+                actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
+
+                actlMsgNode2.get(msg.threadName).add(msg.msg);
+
+                latch.countDown();
+
+                return true;
+            }
+        });
+
+        sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
+
+        latch.await();
+
+        assertEquals(expMsg.size(), actlMsgNode2.size());
+
+        for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
+            assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue()));
+    }
+
+    /**
+     * @param ignMsg Ignite for send message.
+     * @param expMsg Expected messages map.
+     * @param actlMsg Actual message map.
+     * @param msgs List of messages.
+     * @throws Exception If failed.
+     */
+    private void sendOrderedMultiThreads(
+            final IgniteMessaging ignMsg,
+            final ConcurrentMap<String, List<String>> expMsg,
+            final ConcurrentMap<String, List<String>> actlMsg,
+            final List<String> msgs
+    ) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
+
+        ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
+            @Override public boolean apply(UUID uuid, Message msg) {
+                actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
+
+                actlMsg.get(msg.threadName).add(msg.msg);
+
+                latch.countDown();
+
+                return true;
+            }
+        });
+
+        for (int i = 0; i < THREADS; i++)
+            new Thread(new Runnable() {
+                @Override public void run() {
+                    String thdName = Thread.currentThread().getName();
+
+                    List<String> exp = Lists.newArrayList();
+
+                    expMsg.put(thdName, exp);
+
+                    for (String msg : msgs) {
+                        exp.add(msg);
+
+                        ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000);
+                    }
+
+                }
+            }).start();
+
+        latch.await();
+
+        assertEquals(expMsg.size(), actlMsg.size());
+
+        for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
+            assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue()));
+    }
+
+    /**
+     * @param ignite2 Second node.
+     * @param igniteMsg Ignite message.
+     * @param msgStr    Message string.
+     * @param cls       Callback for compare result.
+     * @throws Exception If failed.
+     */
+    private void sendWith2Nodes(
+            final Ignite ignite2,
+            final IgniteMessaging igniteMsg,
+            final String msgStr,
+            final IgniteBiInClosure<String, Thread>  cls
+    ) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+            @Override public boolean apply(UUID uuid, String msg) {
+                Assert.assertEquals(msgStr, msg);
+
+                latch.countDown();
+
+                return true;
+            }
+        });
+
+        send(igniteMsg, msgStr, cls);
+
+        latch.await();
+    }
+
+    /**
+     * @param igniteMsg Ignite messaging.
+     * @param msgStr    Message string.
+     * @param cls       Callback for compare result.
+     * @throws Exception If failed.
+     */
+    private void send(
+           final IgniteMessaging igniteMsg,
+           final String msgStr,
+           final IgniteBiInClosure<String, Thread> cls
+    ) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final AtomicReference<Thread> thread = new AtomicReference<>();
+        final AtomicReference<String> val = new AtomicReference<>();
+
+        igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+            @Override public boolean apply(UUID uuid, String msgStr) {
+                thread.set(Thread.currentThread());
+
+                val.set(msgStr);
+
+                latch.countDown();
+
+                return true;
+            }
+        });
+
+        igniteMsg.send(TOPIC, msgStr);
+
+        latch.await();
+
+        cls.apply(val.get(), thread.get());
+    }
+
+    /**
+     * @param ignite2 Second node.
+     * @param igniteMsg Ignite message.
+     * @param msgs messages for send.
+     * @param cls  Callback for compare result.
+     * @throws Exception If failed.
+     */
+    private void sendOrderedWith2Node(
+            final Ignite ignite2,
+            final IgniteMessaging igniteMsg,
+            final List<String> msgs,
+            final IgniteBiInClosure<List<String>, List<Thread>> cls
+    ) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(msgs.size());
+
+        final List<String> received = Lists.newArrayList();
+
+        ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+            @Override public boolean apply(UUID uuid, String msg) {
+                received.add(msg);
+
+                latch.countDown();
+
+                return true;
+            }
+        });
+
+        sendOrdered(igniteMsg, msgs, cls);
+
+        latch.await();
+
+        assertTrue(msgs.equals(received));
+    }
+
+    /**
+     * @param igniteMsg Ignite message.
+     * @param msgs  messages for send.
+     * @param cls Callback for compare result.
+     * @throws Exception If failed.
+     */
+    private<T> void sendOrdered(
+            final IgniteMessaging igniteMsg,
+            final List<T> msgs,
+            final IgniteBiInClosure<List<T>,List<Thread>> cls
+    ) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(msgs.size());
+
+        final List<T> received = Lists.newArrayList();
+        final List<Thread> threads = Lists.newArrayList();
+
+        for (T msg : msgs)
+            igniteMsg.sendOrdered(TOPIC, msg, 1000);
+
+        igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() {
+            @Override public boolean apply(UUID uuid, T s) {
+                received.add(s);
+
+                threads.add(Thread.currentThread());
+
+                latch.countDown();
+
+                return true;
+            }
+        });
+
+        latch.await();
+
+        cls.apply(received, threads);
+    }
+
+    /**
+     * @return List of ordered messages
+     */
+    private List<String> orderedMessages() {
+        final List<String> msgs = Lists.newArrayList();
+
+        for (int i = 0; i < 1000; i++)
+            msgs.add(String.valueOf(ThreadLocalRandom8.current().nextInt()));
+
+        return msgs;
+    }
+
+    /**
+     *
+     */
+    private static class Message implements Serializable{
+        /** Thread name. */
+        private final String threadName;
+
+        /** Message. */
+        private final String msg;
+
+        /**
+         * @param threadName Thread name.
+         * @param msg Message.
+         */
+        private Message(String threadName, String msg) {
+            this.threadName = threadName;
+            this.msg = msg;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9e20d2a..688edf7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -56,6 +56,7 @@ import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest;
 import org.apache.ignite.marshaller.MarshallerContextSelfTest;
 import org.apache.ignite.messaging.GridMessagingNoPeerClassLoadingSelfTest;
 import org.apache.ignite.messaging.GridMessagingSelfTest;
+import org.apache.ignite.messaging.IgniteMessagingSendAsyncTest;
 import org.apache.ignite.messaging.IgniteMessagingWithClientTest;
 import org.apache.ignite.plugin.security.SecurityPermissionSetBuilderTest;
 import org.apache.ignite.spi.GridSpiLocalHostInjectionTest;
@@ -101,6 +102,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridSelfTest.class));
         suite.addTest(new TestSuite(ClusterGroupHostsSelfTest.class));
         suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
+        suite.addTest(new TestSuite(IgniteMessagingSendAsyncTest.class));
 
         GridTestUtils.addTestIfNeeded(suite, ClusterGroupSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests);

http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 10f18a6..3db68c4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -147,7 +147,7 @@ public class HadoopShuffle extends HadoopComponent {
         if (msg instanceof Message)
             ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
         else
-            ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
+            ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
     }
 
     /**
@@ -298,4 +298,4 @@ public class HadoopShuffle extends HadoopComponent {
     public GridUnsafeMemory memory() {
         return mem;
     }
-}
+}
\ No newline at end of file


[2/7] ignite git commit: Implemented support for enforce join order flag. (cherry picked from commit a7f77d4)

Posted by av...@apache.org.
Implemented support for enforce join order flag.
(cherry picked from commit a7f77d4)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc9fcf7f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc9fcf7f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc9fcf7f

Branch: refs/heads/master
Commit: bc9fcf7f6aaeba826df6a35f1a9aacb17a562337
Parents: bea863f
Author: Alexey Kuznetsov <ak...@gridgain.com>
Authored: Wed Mar 1 22:09:40 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Mar 2 14:12:05 2017 +0700

----------------------------------------------------------------------
 .../internal/visor/query/VisorQueryArgV3.java   | 51 ++++++++++++++++++++
 .../internal/visor/query/VisorQueryJob.java     |  6 +--
 .../resources/META-INF/classnames.properties    |  1 +
 3 files changed, 55 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bc9fcf7f/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java
new file mode 100644
index 0000000..f32c00a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+/**
+ * Arguments for {@link VisorQueryTask}.
+ */
+public class VisorQueryArgV3 extends VisorQueryArgV2 {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Enforce join order flag. */
+    private final boolean enforceJoinOrder;
+
+    /**
+     * @param cacheName Cache name for query.
+     * @param qryTxt Query text.
+     * @param distributedJoins If {@code true} then distributed joins enabled.
+     * @param enforceJoinOrder If {@code true} then enforce join order.
+     * @param loc Flag whether to execute query locally.
+     * @param pageSize Result batch size.
+     */
+    public VisorQueryArgV3(String cacheName, String qryTxt,
+        boolean distributedJoins, boolean enforceJoinOrder, boolean loc, int pageSize) {
+        super(cacheName, qryTxt, distributedJoins, loc, pageSize);
+
+        this.enforceJoinOrder = enforceJoinOrder;
+    }
+
+    /**
+     * @return Enforce join order flag.
+     */
+    public boolean enforceJoinOrder() {
+        return enforceJoinOrder;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc9fcf7f/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index c66b2dd..1ac90ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -131,9 +131,8 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
                 if (scanWithFilter) {
                     boolean caseSensitive = qryTxt.startsWith(SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE);
 
-                    String ptrn = caseSensitive
-                        ? qryTxt.substring(SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length())
-                        : qryTxt.substring(SCAN_CACHE_WITH_FILTER.length());
+                    String ptrn = qryTxt.substring(
+                        caseSensitive ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length() : SCAN_CACHE_WITH_FILTER.length());
 
                     filter = new VisorQueryScanSubstringFilter(caseSensitive, ptrn);
                 }
@@ -162,6 +161,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
                 qry.setPageSize(arg.pageSize());
                 qry.setLocal(arg.local());
                 qry.setDistributedJoins(arg instanceof VisorQueryArgV2 && ((VisorQueryArgV2)arg).distributedJoins());
+                qry.setEnforceJoinOrder(arg instanceof VisorQueryArgV3 && ((VisorQueryArgV3)arg).enforceJoinOrder());
 
                 long start = U.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc9fcf7f/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index c0a6407..8cfed3a 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1822,6 +1822,7 @@ org.apache.ignite.internal.visor.node.VisorSpisConfiguration
 org.apache.ignite.internal.visor.node.VisorTransactionConfiguration
 org.apache.ignite.internal.visor.query.VisorQueryArg
 org.apache.ignite.internal.visor.query.VisorQueryArgV2
+org.apache.ignite.internal.visor.query.VisorQueryArgV3
 org.apache.ignite.internal.visor.query.VisorQueryCleanupTask
 org.apache.ignite.internal.visor.query.VisorQueryCleanupTask$VisorQueryCleanupJob
 org.apache.ignite.internal.visor.query.VisorQueryField


[6/7] ignite git commit: Merge branch 'ignite-1.9' into ignite-1.9.1

Posted by av...@apache.org.
Merge branch 'ignite-1.9' into ignite-1.9.1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8362fe72
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8362fe72
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8362fe72

Branch: refs/heads/master
Commit: 8362fe720779de5d3e5c7d99ec1bdb6191a38a06
Parents: bc9fcf7 0be9273
Author: devozerov <vo...@gridgain.com>
Authored: Thu Mar 2 17:24:59 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Mar 2 17:24:59 2017 +0300

----------------------------------------------------------------------
 doap_Ignite.rdf                                 |   1 -
 .../managers/communication/GridIoManager.java   | 107 +++++++------------
 .../communication/GridIoManagerSelfTest.java    |   2 +-
 .../ant/beautifier/GridJavadocAntTask.java      |   1 +
 parent/pom.xml                                  |   3 +-
 5 files changed, 40 insertions(+), 74 deletions(-)
----------------------------------------------------------------------



[4/7] ignite git commit: IGNITE-4753: DOAP: Node.JS => Node.js (cherry picked from commit 72c2ef6)

Posted by av...@apache.org.
IGNITE-4753: DOAP: Node.JS => Node.js
(cherry picked from commit 72c2ef6)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bcdba2a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bcdba2a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bcdba2a8

Branch: refs/heads/master
Commit: bcdba2a87856a645c97087e11b5ce90683e16b01
Parents: 59ed1d7
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Feb 28 02:16:26 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Mar 2 14:23:30 2017 +0300

----------------------------------------------------------------------
 doap_Ignite.rdf | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bcdba2a8/doap_Ignite.rdf
----------------------------------------------------------------------
diff --git a/doap_Ignite.rdf b/doap_Ignite.rdf
index fb51a15..afe4035 100644
--- a/doap_Ignite.rdf
+++ b/doap_Ignite.rdf
@@ -36,7 +36,6 @@
     <programming-language>Java</programming-language>
     <programming-language>C#</programming-language>
     <programming-language>C++</programming-language>
-    <programming-language>Node.JS</programming-language>
     <programming-language>SQL</programming-language>
     <programming-language>JDBC</programming-language>
     <programming-language>ODBC</programming-language>