You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/13 08:45:48 UTC
[27/50] [abbrv] ignite git commit: ignite-3727 Resotred fix and fixed
issue with wrong method called in GridIoManager.
ignite-3727 Resotred fix and fixed issue with wrong method called in GridIoManager.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59ed1d7e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59ed1d7e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59ed1d7e
Branch: refs/heads/ignite-2.0
Commit: 59ed1d7ec08a1bb827e7354bb5dca80b86944933
Parents: 9d811f1
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 13:28:20 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 13:28:20 2017 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteMessaging.java | 13 +-
.../ignite/internal/IgniteMessagingImpl.java | 8 +-
.../internal/managers/GridManagerAdapter.java | 2 +-
.../managers/communication/GridIoManager.java | 130 ++---
.../communication/GridIoManagerSelfTest.java | 6 +-
...niteMessagingConfigVariationFullApiTest.java | 195 +++++--
.../ignite/messaging/GridMessagingSelfTest.java | 116 +++-
.../messaging/IgniteMessagingSendAsyncTest.java | 544 +++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
.../hadoop/shuffle/HadoopShuffle.java | 4 +-
10 files changed, 890 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
index ff52ed8..e64ded5 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
@@ -77,6 +77,10 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
/**
* Sends given message with specified topic to the nodes in the underlying cluster group.
+ * <p>
+ * By default all local listeners will be executed in the calling thread, or if you use
+ * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's
+ * responsibility to implement back-pressure and limit number of concurrently executed async messages).
*
* @param topic Topic to send to, {@code null} for default topic.
* @param msg Message to send.
@@ -87,6 +91,10 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
/**
* Sends given messages with the specified topic to the nodes in the underlying cluster group.
+ * <p>
+ * By default all local listeners will be executed in the calling thread, or if you use
+ * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's
+ * responsibility to implement back-pressure and limit number of concurrently executed async messages).
*
* @param topic Topic to send to, {@code null} for default topic.
* @param msgs Messages to send. Order of the sending is undefined. If the method produces
@@ -99,7 +107,8 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
/**
* Sends given message with specified topic to the nodes in the underlying cluster group. Messages sent with
* this method will arrive in the same order they were sent. Note that if a topic is used
- * for ordered messages, then it cannot be reused for non-ordered messages.
+ * for ordered messages, then it cannot be reused for non-ordered messages. Note that local listeners
+ * are always executed in public thread pool, no matter default or {@link #withAsync()} mode is used.
* <p>
* The {@code timeout} parameter specifies how long an out-of-order message will stay in a queue,
* waiting for messages that are ordered ahead of it to arrive. If timeout expires, then all ordered
@@ -162,4 +171,4 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
/** {@inheritDoc} */
@Override IgniteMessaging withAsync();
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index 6b33aa5..541fad4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -86,7 +86,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
if (snapshot.isEmpty())
throw U.emptyTopologyException();
- ctx.io().sendUserMessage(snapshot, msg, topic, false, 0);
+ ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -111,7 +111,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
for (Object msg : msgs) {
A.notNull(msg, "msg");
- ctx.io().sendUserMessage(snapshot, msg, topic, false, 0);
+ ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync());
}
}
catch (IgniteCheckedException e) {
@@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
if (timeout == 0)
timeout = ctx.config().getNetworkTimeout();
- ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout);
+ ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, false);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -254,4 +254,4 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
protected Object readResolve() throws ObjectStreamException {
return prj.message();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 584cc56..5992eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -390,7 +390,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
if (msg instanceof Message)
ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL);
else
- ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0);
+ ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false);
}
catch (IgniteCheckedException e) {
throw unwrapException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 7ef7bc0..0b7c790 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -188,6 +188,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** */
private final AtomicLong ioTestId = new AtomicLong();
+ /** No-op runnable. */
+ private static final IgniteRunnable NOOP = new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ };
+
/**
* @param ctx Grid kernal context.
*/
@@ -1237,6 +1244,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param timeout Timeout.
* @param skipOnTimeout Whether message can be skipped on timeout.
* @param ackC Ack closure.
+ * @param async If {@code true} message for local node will be processed in pool, otherwise in current thread.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
private void send(
@@ -1248,11 +1256,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
boolean ordered,
long timeout,
boolean skipOnTimeout,
- IgniteInClosure<IgniteException> ackC
+ IgniteInClosure<IgniteException> ackC,
+ boolean async
) throws IgniteCheckedException {
assert node != null;
assert topic != null;
assert msg != null;
+ assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
@@ -1266,6 +1276,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (ordered)
processOrderedMessage(locNodeId, ioMsg, plc, null);
+ else if (async)
+ processRegularMessage(locNodeId, ioMsg, plc, NOOP);
else
processRegularMessage0(ioMsg, locNodeId);
@@ -1323,7 +1335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
}
/**
@@ -1335,7 +1347,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false, null);
+ send(node, topic, -1, msg, plc, false, 0, false, null, false);
}
/**
@@ -1347,7 +1359,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
}
/**
@@ -1360,7 +1372,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topicOrd, msg, plc, false, 0, false, null);
+ send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
}
/**
@@ -1382,34 +1394,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
- }
-
- /**
- * @param nodeId Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param timeout Timeout to keep a message on receiving queue.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void sendOrderedMessage(
- UUID nodeId,
- Object topic,
- Message msg,
- byte plc,
- long timeout,
- boolean skipOnTimeout
- ) throws IgniteCheckedException {
- assert timeout > 0 || skipOnTimeout;
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null)
- throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
}
/**
@@ -1422,7 +1407,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
}
/**
@@ -1458,7 +1443,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false, ackC);
+ send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
}
/**
@@ -1514,10 +1499,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
}
- /**
+ /**
* Sends a peer deployable user message.
*
* @param nodes Destination nodes.
@@ -1525,7 +1510,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
- sendUserMessage(nodes, msg, null, false, 0);
+ sendUserMessage(nodes, msg, null, false, 0, false);
}
/**
@@ -1536,11 +1521,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param topic Message topic to use.
* @param ordered Is message ordered?
* @param timeout Message timeout in milliseconds for ordered messages.
+ * @param async Async flag.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@SuppressWarnings("ConstantConditions")
public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg,
- @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException {
+ @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException {
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
byte[] serMsg = null;
@@ -1584,8 +1570,18 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (ordered)
sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
- else if (loc)
- send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ else if (loc) {
+ send(F.first(nodes),
+ TOPIC_COMM_USER,
+ TOPIC_COMM_USER.ordinal(),
+ ioMsg,
+ PUBLIC_POOL,
+ false,
+ 0,
+ false,
+ null,
+ async);
+ }
else {
ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
@@ -1594,10 +1590,21 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (!rmtNodes.isEmpty())
send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
- // Will call local listeners in current thread synchronously, so must go the last
+ // Will call local listeners in current thread synchronously or through pool,
+ // depending async flag, so must go the last
// to allow remote nodes execute the requested operation in parallel.
- if (locNode != null)
- send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ if (locNode != null) {
+ send(locNode,
+ TOPIC_COMM_USER,
+ TOPIC_COMM_USER.ordinal(),
+ ioMsg,
+ PUBLIC_POOL,
+ false,
+ 0,
+ false,
+ null,
+ async);
+ }
}
}
@@ -1639,35 +1646,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
- * @param nodeId Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param timeout Timeout to keep a message on receiving queue.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @param ackC Ack closure.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void sendOrderedMessage(
- UUID nodeId,
- Object topic,
- Message msg,
- byte plc,
- long timeout,
- boolean skipOnTimeout,
- IgniteInClosure<IgniteException> ackC
- ) throws IgniteCheckedException {
- assert timeout > 0 || skipOnTimeout;
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null)
- throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
- }
-
- /**
* @param nodes Destination nodes.
* @param topic Topic to send the message to.
* @param topicOrd Topic ordinal value.
@@ -1701,7 +1679,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// messages to one node vs. many.
if (!nodes.isEmpty()) {
for (ClusterNode node : nodes)
- send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
+ send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false);
}
else if (log.isDebugEnabled())
log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
@@ -1929,8 +1907,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (rmv && log.isDebugEnabled())
log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']');
- if (lsnr instanceof ArrayListener)
- {
+ if (lsnr instanceof ArrayListener) {
for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr)
closeListener(childLsnr);
}
@@ -1942,6 +1919,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/**
* Closes a listener, if applicable.
+ *
* @param lsnr Listener.
*/
private void closeListener(GridMessageListener lsnr) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index 2039d81..2bc1398 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -145,7 +145,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
try {
- ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L);
+ ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L, false);
}
catch (IgniteCheckedException ignored) {
// No-op. We are using mocks so real sending is impossible.
@@ -169,7 +169,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
try {
- ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L);
+ ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L, false);
}
catch (Exception ignored) {
// No-op. We are using mocks so real sending is impossible.
@@ -257,4 +257,4 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
return 0;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
index 31b0663..49aab10 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
@@ -58,7 +58,18 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
public void testLocalServer() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- localServerInternal();
+ localServerInternal(false);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalServerAsync() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ localServerInternal(true);
}
});
}
@@ -83,7 +94,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- serverClientMessage();
+ serverClientMessage(false);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerClientMessageAsync() throws Exception {
+ if (!testsCfg.withClients())
+ return;
+
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ serverClientMessage(true);
}
});
}
@@ -97,7 +122,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- clientClientMessage();
+ clientClientMessage(false);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientClientMessageAsync() throws Exception {
+ if (!testsCfg.withClients())
+ return;
+
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ clientClientMessage(true);
}
});
}
@@ -111,7 +150,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- clientServerMessage();
+ clientServerMessage(false);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientServerMessageAsync() throws Exception {
+ if (!testsCfg.withClients())
+ return;
+
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ clientServerMessage(true);
}
});
}
@@ -133,7 +186,18 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
public void testOrderedMessage() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- orderedMessage();
+ orderedMessage(false);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOrderedMessageAsync() throws Exception {
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ orderedMessage(true);
}
});
}
@@ -147,7 +211,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- clientServerOrderedMessage();
+ clientServerOrderedMessage(false);
}
});
}
@@ -155,13 +219,42 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
/**
* @throws Exception If failed.
*/
+ public void testClientServerOrderedMessageAsync() throws Exception {
+ if (!testsCfg.withClients())
+ return;
+
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ clientServerOrderedMessage(true);
+ }
+ });
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
public void testClientClientOrderedMessage() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- clientClientOrderedMessage();
+ clientClientOrderedMessage(false);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientClientOrderedMessageAsync() throws Exception {
+ if (!testsCfg.withClients())
+ return;
+
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ clientClientOrderedMessage(true);
}
});
}
@@ -175,16 +268,32 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- serverClientOrderedMessage();
+ serverClientOrderedMessage(false);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerClientOrderedMessageAsync() throws Exception {
+ if (!testsCfg.withClients())
+ return;
+
+ runInAllDataModes(new TestRunnable() {
+ @Override public void run() throws Exception {
+ serverClientOrderedMessage(true);
}
});
}
/**
* Single server test.
+ *
+ * @param async Async message send flag.
* @throws Exception If failed.
*/
- private void localServerInternal() throws Exception {
+ private void localServerInternal(boolean async) throws Exception {
int messages = MSGS;
Ignite ignite = grid(SERVER_NODE_IDX);
@@ -197,7 +306,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
try {
for (int i = 0; i < messages; i++)
- sendMessage(ignite, grp, value(i));
+ sendMessage(ignite, grp, value(i), async);
assertTrue(LATCH.await(10, TimeUnit.SECONDS));
@@ -238,52 +347,59 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
/**
* Server sends a message and client receives it.
+ *
+ * @param async Async message send flag.
* @throws Exception If failed.
*/
- private void serverClientMessage() throws Exception {
+ private void serverClientMessage(boolean async) throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert grp.nodes().size() > 0;
- registerListenerAndSendMessages(ignite, grp);
+ registerListenerAndSendMessages(ignite, grp, async);
}
/**
* Client sends a message and client receives it.
+ *
+ * @param async Async message send flag.
* @throws Exception If failed.
*/
- private void clientClientMessage() throws Exception {
+ private void clientClientMessage(boolean async) throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert grp.nodes().size() > 0;
- registerListenerAndSendMessages(ignite, grp);
+ registerListenerAndSendMessages(ignite, grp, async);
}
/**
* Client sends a message and client receives it.
+ *
+ * @param async Async message send flag.
* @throws Exception If failed.
*/
- private void clientServerMessage() throws Exception {
+ private void clientServerMessage(boolean async) throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forServers();
assert grp.nodes().size() > 0;
- registerListenerAndSendMessages(ignite, grp);
+ registerListenerAndSendMessages(ignite, grp, async);
}
/**
* @param ignite Ignite.
* @param grp Cluster group.
+ * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp) throws Exception {
+ private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception {
int messages = MSGS;
LATCH = new CountDownLatch(grp.nodes().size() * messages);
@@ -292,7 +408,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
try {
for (int i = 0; i < messages; i++)
- sendMessage(ignite, grp, value(i));
+ sendMessage(ignite, grp, value(i), async);
assertTrue(LATCH.await(10, TimeUnit.SECONDS));
@@ -335,67 +451,68 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
}
/**
- *
+ * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void orderedMessage() throws Exception {
+ private void orderedMessage(boolean async) throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal();
assert grp.nodes().size() > 0;
- registerListenerAndSendOrderedMessages(ignite, grp);
+ registerListenerAndSendOrderedMessages(ignite, grp, async);
}
/**
- *
+ * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void clientServerOrderedMessage() throws Exception {
+ private void clientServerOrderedMessage(boolean async) throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forServers();
assert grp.nodes().size() > 0;
- registerListenerAndSendOrderedMessages(ignite, grp);
+ registerListenerAndSendOrderedMessages(ignite, grp, async);
}
/**
- *
+ * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void clientClientOrderedMessage() throws Exception {
+ private void clientClientOrderedMessage(boolean async) throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert grp.nodes().size() > 0;
- registerListenerAndSendOrderedMessages(ignite, grp);
+ registerListenerAndSendOrderedMessages(ignite, grp, async);
}
/**
- *
+ * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void serverClientOrderedMessage() throws Exception {
+ private void serverClientOrderedMessage(boolean async) throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert grp.nodes().size() > 0;
- registerListenerAndSendOrderedMessages(ignite, grp);
+ registerListenerAndSendOrderedMessages(ignite, grp, async);
}
/**
* @param ignite Ignite.
* @param grp Cluster group.
+ * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception {
+ private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception {
int messages = MSGS;
LATCH = new CountDownLatch(grp.nodes().size() * messages);
@@ -403,8 +520,12 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener());
try {
- for (int i=0; i < messages; i++)
- ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
+ for (int i=0; i < messages; i++){
+ if (async)
+ ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, value(i), 2000);
+ else
+ ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
+ }
assertTrue(LATCH.await(10, TimeUnit.SECONDS));
@@ -419,9 +540,13 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
* @param nodeSnd Sender Ignite node.
* @param grp Cluster group.
* @param msg Message.
+ * @param async Async message send flag.
*/
- private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg) {
- nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
+ private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async) {
+ if (async)
+ nodeSnd.message(grp).withAsync().send(MESSAGE_TOPIC, msg);
+ else
+ nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index 5a0dfa2..a166c3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -36,15 +37,20 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
+import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.IgniteInstanceResource;;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -198,7 +204,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
discoSpi.setIpFinder(ipFinder);
@@ -944,7 +950,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
* @throws Exception If error occurs.
*/
public void testSendMessageWithExternalClassLoader() throws Exception {
- URL[] urls = new URL[] { new URL(GridTestProperties.getProperty("p2p.uri.cls")) };
+ URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))};
ClassLoader extLdr = new URLClassLoader(urls);
@@ -1028,6 +1034,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
public void testAsync() throws Exception {
final AtomicInteger msgCnt = new AtomicInteger();
+ TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi();
+
assertFalse(ignite2.message().isAsync());
final IgniteMessaging msg = ignite2.message().withAsync();
@@ -1044,6 +1052,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
}
}, IllegalStateException.class, null);
+ discoSpi.blockCustomEvent();
+
final String topic = "topic";
UUID id = msg.remoteListen(topic, new P2<UUID, Object>() {
@@ -1059,9 +1069,15 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
Assert.assertNull(id);
- IgniteFuture<UUID> fut = msg.future();
+ IgniteFuture<UUID> starFut = msg.future();
+
+ Assert.assertNotNull(starFut);
+
+ U.sleep(500);
- Assert.assertNotNull(fut);
+ Assert.assertFalse(starFut.isDone());
+
+ discoSpi.stopBlock();
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -1071,10 +1087,14 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
}
}, IllegalStateException.class, null);
- id = fut.get();
+ id = starFut.get();
Assert.assertNotNull(id);
+ Assert.assertTrue(starFut.isDone());
+
+ discoSpi.blockCustomEvent();
+
message(ignite1.cluster().forRemotes()).send(topic, "msg1");
GridTestUtils.waitForCondition(new PA() {
@@ -1099,8 +1119,16 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
}
}, IllegalStateException.class, null);
+ U.sleep(500);
+
+ Assert.assertFalse(stopFut.isDone());
+
+ discoSpi.stopBlock();
+
stopFut.get();
+ Assert.assertTrue(stopFut.isDone());
+
message(ignite1.cluster().forRemotes()).send(topic, "msg2");
U.sleep(1000);
@@ -1109,6 +1137,80 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
}
/**
+ *
+ */
+ static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private boolean blockCustomEvt;
+
+ /** */
+ private final Object mux = new Object();
+
+ /** */
+ private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+ synchronized (mux) {
+ if (blockCustomEvt) {
+ DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate");
+ if (msg0 instanceof StopRoutineDiscoveryMessage || msg0 instanceof StartRoutineDiscoveryMessage) {
+ log.info("Block custom message: " + msg0);
+ blockedMsgs.add(msg);
+
+ mux.notifyAll();
+ }
+ return;
+ }
+ }
+
+ super.sendCustomEvent(msg);
+ }
+
+ /**
+ *
+ */
+ public void blockCustomEvent() {
+ synchronized (mux) {
+ assert blockedMsgs.isEmpty() : blockedMsgs;
+
+ blockCustomEvt = true;
+ }
+ }
+
+ /**
+ * @throws InterruptedException If interrupted.
+ */
+ public void waitCustomEvent() throws InterruptedException {
+ synchronized (mux) {
+ while (blockedMsgs.isEmpty())
+ mux.wait();
+ }
+ }
+
+ /**
+ *
+ */
+ public void stopBlock() {
+ List<DiscoverySpiCustomMessage> msgs;
+
+ synchronized (this) {
+ msgs = new ArrayList<>(blockedMsgs);
+
+ blockCustomEvt = false;
+
+ blockedMsgs.clear();
+ }
+
+ for (DiscoverySpiCustomMessage msg : msgs) {
+ log.info("Resend blocked message: " + msg);
+
+ super.sendCustomEvent(msg);
+ }
+ }
+ }
+
+ /**
* Tests that message listener registers only for one oldest node.
*
* @throws Exception If an error occurred.
@@ -1152,4 +1254,4 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
assertEquals(1, MSG_CNT.get());
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
new file mode 100644
index 0000000..75e7d22
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.messaging;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
+import org.junit.Assert;
+
+/**
+ *
+ */
+public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Threads number for multi-thread tests. */
+ private static final int THREADS = 10;
+
+ /** */
+ private final String TOPIC = "topic";
+
+ /** */
+ private final String msgStr = "message";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * Checks if use default mode, local listeners execute in the same thread, 1 node in topology.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSendDefaultMode() throws Exception {
+ Ignite ignite1 = startGrid(1);
+
+ send(ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () {
+ @Override public void apply(String msg, Thread thread) {
+ Assert.assertEquals(Thread.currentThread(), thread);
+ Assert.assertEquals(msgStr, msg);
+ }
+ });
+ }
+
+ /**
+ * Checks if use async mode, local listeners execute in another thread, 1 node in topology.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSendAsyncMode() throws Exception {
+ Ignite ignite1 = startGrid(1);
+
+ send(ignite1.message().withAsync(), msgStr, new IgniteBiInClosure<String, Thread> () {
+ @Override public void apply(String msg, Thread thread) {
+ Assert.assertTrue(!Thread.currentThread().equals(thread));
+ Assert.assertEquals(msgStr, msg);
+ }
+ });
+ }
+
+ /**
+ * Checks if use default mode, local listeners execute in the same thread, 2 nodes in topology.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSendDefaultMode2Nodes() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () {
+ @Override public void apply(String msg, Thread thread) {
+ Assert.assertEquals(Thread.currentThread(), thread);
+ Assert.assertEquals(msgStr, msg);
+ }
+ });
+ }
+
+ /**
+ * Checks if use async mode, local listeners execute in another thread, 2 nodes in topology.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSendAsyncMode2Node() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ sendWith2Nodes(ignite2, ignite1.message().withAsync(), msgStr, new IgniteBiInClosure<String, Thread> () {
+ @Override public void apply(String msg, Thread thread) {
+ Assert.assertTrue(!Thread.currentThread().equals(thread));
+ Assert.assertEquals(msgStr, msg);
+ }
+ });
+ }
+
+ /**
+ * Checks that sendOrdered works in thread pool, 1 node in topology.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSendOrderedDefaultMode() throws Exception {
+ Ignite ignite1 = startGrid(1);
+
+ final List<String> msgs = orderedMessages();
+
+ sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () {
+ @Override public void apply(List<String> received, List<Thread> threads) {
+ assertFalse(threads.contains(Thread.currentThread()));
+ assertTrue(msgs.equals(received));
+ }
+ });
+ }
+
+ /**
+ * Checks that sendOrdered work in thread pool, 1 node in topology.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSendOrderedAsyncMode() throws Exception {
+ Ignite ignite1 = startGrid(1);
+
+ final List<String> msgs = orderedMessages();
+
+ sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () {
+ @Override public void apply(List<String> received, List<Thread> threads) {
+ assertFalse(threads.contains(Thread.currentThread()));
+ assertTrue(msgs.equals(received));
+ }
+ });
+ }
+
+ /**
+ * Checks that sendOrdered work in thread pool, 2 nodes in topology.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSendOrderedDefaultMode2Node() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ final List<String> msgs = orderedMessages();
+
+ sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
+ @Override public void apply(List<String> received, List<Thread> threads) {
+ assertFalse(threads.contains(Thread.currentThread()));
+ assertTrue(msgs.equals(received));
+ }
+ });
+ }
+
+ /**
+ * Checks that sendOrdered work in thread pool, 2 nodes in topology.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSendOrderedAsyncMode2Node() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ final List<String> msgs = orderedMessages();
+
+ sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
+ @Override public void apply(List<String> received, List<Thread> threads) {
+ assertFalse(threads.contains(Thread.currentThread()));
+ assertTrue(msgs.equals(received));
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSendOrderedDefaultModeMultiThreads() throws Exception {
+ Ignite ignite = startGrid(1);
+
+ sendOrderedMultiThreads(ignite.message());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSendOrderedAsyncModeMultiThreads() throws Exception {
+ Ignite ignite = startGrid(1);
+
+ sendOrderedMultiThreads(ignite.message().withAsync());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message().withAsync());
+ }
+
+ /**
+ * @param ignite2 Second node.
+ * @param ignMsg IgniteMessage.
+ * @throws Exception If failed.
+ */
+ private void sendOrderedMultiThreadsWith2Node(
+ final Ignite ignite2,
+ final IgniteMessaging ignMsg
+ ) throws Exception {
+ final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
+ final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
+
+ final List<String> msgs = orderedMessages();
+
+ sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs);
+
+ }
+
+ /**
+ * @param ignMsg IgniteMessaging.
+ * @throws Exception If failed.
+ */
+ private void sendOrderedMultiThreads(
+ final IgniteMessaging ignMsg
+ ) throws Exception {
+ final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
+ final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
+
+ final List<String> msgs = orderedMessages();
+
+ sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
+ }
+
+ /**
+ * @param ignite2 Second node.
+ * @param ignMsg Ignite for send message.
+ * @param expMsg Expected messages map.
+ * @param actlMsg Actual message map.
+ * @param msgs List of messages.
+ * @throws Exception If failed.
+ */
+ private void sendOrderedMultiThreadsWith2Node(
+ final Ignite ignite2,
+ final IgniteMessaging ignMsg,
+ final ConcurrentMap<String, List<String>> expMsg,
+ final ConcurrentMap<String, List<String>> actlMsg,
+ final List<String> msgs
+ ) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
+
+ final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap();
+
+ ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
+ @Override public boolean apply(UUID uuid, Message msg) {
+ actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
+
+ actlMsgNode2.get(msg.threadName).add(msg.msg);
+
+ latch.countDown();
+
+ return true;
+ }
+ });
+
+ sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
+
+ latch.await();
+
+ assertEquals(expMsg.size(), actlMsgNode2.size());
+
+ for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
+ assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue()));
+ }
+
+ /**
+ * @param ignMsg Ignite for send message.
+ * @param expMsg Expected messages map.
+ * @param actlMsg Actual message map.
+ * @param msgs List of messages.
+ * @throws Exception If failed.
+ */
+ private void sendOrderedMultiThreads(
+ final IgniteMessaging ignMsg,
+ final ConcurrentMap<String, List<String>> expMsg,
+ final ConcurrentMap<String, List<String>> actlMsg,
+ final List<String> msgs
+ ) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
+
+ ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
+ @Override public boolean apply(UUID uuid, Message msg) {
+ actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
+
+ actlMsg.get(msg.threadName).add(msg.msg);
+
+ latch.countDown();
+
+ return true;
+ }
+ });
+
+ for (int i = 0; i < THREADS; i++)
+ new Thread(new Runnable() {
+ @Override public void run() {
+ String thdName = Thread.currentThread().getName();
+
+ List<String> exp = Lists.newArrayList();
+
+ expMsg.put(thdName, exp);
+
+ for (String msg : msgs) {
+ exp.add(msg);
+
+ ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000);
+ }
+
+ }
+ }).start();
+
+ latch.await();
+
+ assertEquals(expMsg.size(), actlMsg.size());
+
+ for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
+ assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue()));
+ }
+
+ /**
+ * @param ignite2 Second node.
+ * @param igniteMsg Ignite message.
+ * @param msgStr Message string.
+ * @param cls Callback for compare result.
+ * @throws Exception If failed.
+ */
+ private void sendWith2Nodes(
+ final Ignite ignite2,
+ final IgniteMessaging igniteMsg,
+ final String msgStr,
+ final IgniteBiInClosure<String, Thread> cls
+ ) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+ @Override public boolean apply(UUID uuid, String msg) {
+ Assert.assertEquals(msgStr, msg);
+
+ latch.countDown();
+
+ return true;
+ }
+ });
+
+ send(igniteMsg, msgStr, cls);
+
+ latch.await();
+ }
+
+ /**
+ * @param igniteMsg Ignite messaging.
+ * @param msgStr Message string.
+ * @param cls Callback for compare result.
+ * @throws Exception If failed.
+ */
+ private void send(
+ final IgniteMessaging igniteMsg,
+ final String msgStr,
+ final IgniteBiInClosure<String, Thread> cls
+ ) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final AtomicReference<Thread> thread = new AtomicReference<>();
+ final AtomicReference<String> val = new AtomicReference<>();
+
+ igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+ @Override public boolean apply(UUID uuid, String msgStr) {
+ thread.set(Thread.currentThread());
+
+ val.set(msgStr);
+
+ latch.countDown();
+
+ return true;
+ }
+ });
+
+ igniteMsg.send(TOPIC, msgStr);
+
+ latch.await();
+
+ cls.apply(val.get(), thread.get());
+ }
+
+ /**
+ * @param ignite2 Second node.
+ * @param igniteMsg Ignite message.
+ * @param msgs messages for send.
+ * @param cls Callback for compare result.
+ * @throws Exception If failed.
+ */
+ private void sendOrderedWith2Node(
+ final Ignite ignite2,
+ final IgniteMessaging igniteMsg,
+ final List<String> msgs,
+ final IgniteBiInClosure<List<String>, List<Thread>> cls
+ ) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(msgs.size());
+
+ final List<String> received = Lists.newArrayList();
+
+ ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+ @Override public boolean apply(UUID uuid, String msg) {
+ received.add(msg);
+
+ latch.countDown();
+
+ return true;
+ }
+ });
+
+ sendOrdered(igniteMsg, msgs, cls);
+
+ latch.await();
+
+ assertTrue(msgs.equals(received));
+ }
+
+ /**
+ * @param igniteMsg Ignite message.
+ * @param msgs messages for send.
+ * @param cls Callback for compare result.
+ * @throws Exception If failed.
+ */
+ private<T> void sendOrdered(
+ final IgniteMessaging igniteMsg,
+ final List<T> msgs,
+ final IgniteBiInClosure<List<T>,List<Thread>> cls
+ ) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(msgs.size());
+
+ final List<T> received = Lists.newArrayList();
+ final List<Thread> threads = Lists.newArrayList();
+
+ for (T msg : msgs)
+ igniteMsg.sendOrdered(TOPIC, msg, 1000);
+
+ igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() {
+ @Override public boolean apply(UUID uuid, T s) {
+ received.add(s);
+
+ threads.add(Thread.currentThread());
+
+ latch.countDown();
+
+ return true;
+ }
+ });
+
+ latch.await();
+
+ cls.apply(received, threads);
+ }
+
+ /**
+ * @return List of ordered messages
+ */
+ private List<String> orderedMessages() {
+ final List<String> msgs = Lists.newArrayList();
+
+ for (int i = 0; i < 1000; i++)
+ msgs.add(String.valueOf(ThreadLocalRandom8.current().nextInt()));
+
+ return msgs;
+ }
+
+ /**
+ *
+ */
+ private static class Message implements Serializable{
+ /** Thread name. */
+ private final String threadName;
+
+ /** Message. */
+ private final String msg;
+
+ /**
+ * @param threadName Thread name.
+ * @param msg Message.
+ */
+ private Message(String threadName, String msg) {
+ this.threadName = threadName;
+ this.msg = msg;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9e20d2a..688edf7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -56,6 +56,7 @@ import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest;
import org.apache.ignite.marshaller.MarshallerContextSelfTest;
import org.apache.ignite.messaging.GridMessagingNoPeerClassLoadingSelfTest;
import org.apache.ignite.messaging.GridMessagingSelfTest;
+import org.apache.ignite.messaging.IgniteMessagingSendAsyncTest;
import org.apache.ignite.messaging.IgniteMessagingWithClientTest;
import org.apache.ignite.plugin.security.SecurityPermissionSetBuilderTest;
import org.apache.ignite.spi.GridSpiLocalHostInjectionTest;
@@ -101,6 +102,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridSelfTest.class));
suite.addTest(new TestSuite(ClusterGroupHostsSelfTest.class));
suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
+ suite.addTest(new TestSuite(IgniteMessagingSendAsyncTest.class));
GridTestUtils.addTestIfNeeded(suite, ClusterGroupSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59ed1d7e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 10f18a6..3db68c4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -147,7 +147,7 @@ public class HadoopShuffle extends HadoopComponent {
if (msg instanceof Message)
ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
else
- ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
+ ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
}
/**
@@ -298,4 +298,4 @@ public class HadoopShuffle extends HadoopComponent {
public GridUnsafeMemory memory() {
return mem;
}
-}
+}
\ No newline at end of file