You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/14 13:11:46 UTC
[27/34] ignite git commit: ignite-3727 added test for sendOrdered +
javadoc (behavior sendOrdered)
ignite-3727 added test for sendOrdered + javadoc (behavior sendOrdered)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de59444b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de59444b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de59444b
Branch: refs/heads/ignite-3727-2
Commit: de59444b235ae325bb35976dbd0f211ce8dfb69c
Parents: 60b030c
Author: DmitriyGovorukhin <dg...@gridgain.com>
Authored: Wed Sep 14 11:18:10 2016 +0300
Committer: DmitriyGovorukhin <dg...@gridgain.com>
Committed: Wed Sep 14 11:18:10 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteMessaging.java | 4 +-
.../messaging/IgniteMessagingSendAsyncTest.java | 147 ++++++++++++++++++-
2 files changed, 149 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/de59444b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
index 00a4fc8..b0cbe1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
@@ -103,7 +103,9 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
/**
* Sends given message with specified topic to the nodes in the underlying cluster group. Messages sent with
* this method will arrive in the same order they were sent. Note that if a topic is used
- * for ordered messages, then it cannot be reused for non-ordered messages.
+ * for ordered messages, then it cannot be reused for non-ordered messages. Note if you have local listener
+ * on this topic, all messages will process through thread pool, and current thread will never be blocked
+ * when you invoke sendOrdered, no matter which mode you used (default or {@link #withAsync()}).
* <p>
* The {@code timeout} parameter specifies how long an out-of-order message will stay in a queue,
* waiting for messages that are ordered ahead of it to arrive. If timeout expires, then all ordered
http://git-wip-us.apache.org/repos/asf/ignite/blob/de59444b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
index 66c9ed0..4cfff38 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.messaging;
+import com.google.common.collect.Lists;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -25,6 +26,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import java.io.Serializable;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@@ -65,7 +67,6 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
Assert.assertEquals(msgStr, msg);
}
});
-
}
/**
@@ -116,6 +117,72 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
}
/**
+ * Test for check, that SendOrdered work in our thread pool. 1 node in topology.
+ */
+ public void testSendOrderedDefaultMode() throws Exception {
+ Ignite ignite1 = startGrid(1);
+
+ final List<String> msgs = orderedMsg();
+
+ sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () {
+ @Override public void apply(List<String> received, List<Thread> threads) {
+ assertFalse(threads.contains(Thread.currentThread()));
+ assertTrue(msgs.equals(received));
+ }
+ });
+ }
+
+ /**
+ * Test for check, that SendOrdered work in our thread pool. 1 node in topology.
+ */
+ public void testSendOrderedAsyncMode() throws Exception {
+ Ignite ignite1 = startGrid(1);
+
+ final List<String> msgs = orderedMsg();
+
+ sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () {
+ @Override public void apply(List<String> received, List<Thread> threads) {
+ assertFalse(threads.contains(Thread.currentThread()));
+ assertTrue(msgs.equals(received));
+ }
+ });
+ }
+
+ /**
+ * Test for check, that SendOrdered work in our thread pool. 2 node in topology.
+ */
+ public void testSendOrderedDefaultMode2Node() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ final List<String> msgs = orderedMsg();
+
+ sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
+ @Override public void apply(List<String> received, List<Thread> threads) {
+ assertFalse(threads.contains(Thread.currentThread()));
+ assertTrue(msgs.equals(received));
+ }
+ });
+ }
+
+ /**
+ * Test for check, that SendOrdered work in our thread pool. 2 node in topology.
+ */
+ public void testSendOrderedAsyncMode2Node() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ final List<String> msgs = orderedMsg();
+
+ sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
+ @Override public void apply(List<String> received, List<Thread> threads) {
+ assertFalse(threads.contains(Thread.currentThread()));
+ assertTrue(msgs.equals(received));
+ }
+ });
+ }
+
+ /**
* @param igniteMsg Ignite message.
* @param msgStr Message string.
* @param cls Callback for compare result.
@@ -171,4 +238,82 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
cls.apply(val.get(), thread.get());
}
+
+ /**
+ * @param ignite2 Ignite 2.
+ * @param igniteMsg Ignite message.
+ * @param msgs messages for send.
+ * @param cls Callback for compare result.
+ */
+ private void sendOrderedWith2Node(
+ final Ignite ignite2,
+ final IgniteMessaging igniteMsg,
+ final List<String> msgs,
+ final IgniteBiInClosure<List<String>,List<Thread>> cls
+ ) throws Exception {
+
+ final CountDownLatch latch = new CountDownLatch(msgs.size());
+
+ final List<String> received = Lists.newArrayList();
+
+ ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+ @Override public boolean apply(UUID uuid, String msg) {
+ received.add(msg);
+ latch.countDown();
+ return true;
+ }
+ });
+
+ sendOrdered(igniteMsg, msgs, cls);
+
+ latch.await();
+
+ assertTrue(msgs.equals(received));
+
+ }
+
+ /**
+ * @param igniteMsg Ignite message.
+ * @param msgs messages for send.
+ * @param cls Callback for compare result.
+ */
+ private void sendOrdered(
+ final IgniteMessaging igniteMsg,
+ final List<String> msgs,
+ final IgniteBiInClosure<List<String>,List<Thread>> cls
+ ) throws InterruptedException {
+
+ final CountDownLatch latch = new CountDownLatch(msgs.size());
+
+ final List<String> received = Lists.newArrayList();
+ final List<Thread> threads = Lists.newArrayList();
+
+ for (String msg : msgs)
+ igniteMsg.sendOrdered(TOPIC, msg, 1000);
+
+ igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+ @Override public boolean apply(UUID uuid, String s) {
+ received.add(s);
+ threads.add(Thread.currentThread());
+ latch.countDown();
+ return true;
+ }
+ });
+
+ latch.await();
+
+ cls.apply(received, threads);
+ }
+
+ /**
+ * @return List ordered messages
+ */
+ private List<String> orderedMsg() {
+ final List<String> msgs = Lists.newArrayList();
+
+ for (int i = 0; i < 1000; i++)
+ msgs.add("" + i);
+
+ return msgs;
+ }
}