You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/14 13:11:46 UTC

[27/34] ignite git commit: ignite-3727 added test for sendOrdered + javadoc (behavior sendOrdered)

ignite-3727  added test for sendOrdered + javadoc (behavior sendOrdered)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de59444b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de59444b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de59444b

Branch: refs/heads/ignite-3727-2
Commit: de59444b235ae325bb35976dbd0f211ce8dfb69c
Parents: 60b030c
Author: DmitriyGovorukhin <dg...@gridgain.com>
Authored: Wed Sep 14 11:18:10 2016 +0300
Committer: DmitriyGovorukhin <dg...@gridgain.com>
Committed: Wed Sep 14 11:18:10 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteMessaging.java |   4 +-
 .../messaging/IgniteMessagingSendAsyncTest.java | 147 ++++++++++++++++++-
 2 files changed, 149 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/de59444b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
index 00a4fc8..b0cbe1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
@@ -103,7 +103,9 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
     /**
      * Sends given message with specified topic to the nodes in the underlying cluster group. Messages sent with
      * this method will arrive in the same order they were sent. Note that if a topic is used
-     * for ordered messages, then it cannot be reused for non-ordered messages.
+     * for ordered messages, then it cannot be reused for non-ordered messages. Note if you have local listener
+     * on this topic, all messages will process through thread pool, and current thread will never be blocked
+     * when you invoke sendOrdered, no matter which mode you used (default or {@link #withAsync()}).
      * <p>
      * The {@code timeout} parameter specifies how long an out-of-order message will stay in a queue,
      * waiting for messages that are ordered ahead of it to arrive. If timeout expires, then all ordered

http://git-wip-us.apache.org/repos/asf/ignite/blob/de59444b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
index 66c9ed0..4cfff38 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.messaging;
 
+import com.google.common.collect.Lists;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -25,6 +26,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
@@ -65,7 +67,6 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
                 Assert.assertEquals(msgStr, msg);
             }
         });
-
     }
 
     /**
@@ -116,6 +117,72 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
     }
 
     /**
+     * Test for check, that SendOrdered work in our thread pool. 1 node in topology.
+     */
+    public void testSendOrderedDefaultMode() throws Exception {
+        Ignite ignite1 = startGrid(1);
+
+        final List<String> msgs = orderedMsg();
+
+        sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>,  List<Thread>> () {
+            @Override public void apply(List<String> received, List<Thread> threads) {
+                assertFalse(threads.contains(Thread.currentThread()));
+                assertTrue(msgs.equals(received));
+            }
+        });
+    }
+
+    /**
+     * Test for check, that SendOrdered work in our thread pool. 1 node in topology.
+     */
+    public void testSendOrderedAsyncMode() throws Exception {
+        Ignite ignite1 = startGrid(1);
+
+        final List<String> msgs = orderedMsg();
+
+        sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>,  List<Thread>> () {
+            @Override public void apply(List<String> received, List<Thread> threads) {
+                assertFalse(threads.contains(Thread.currentThread()));
+                assertTrue(msgs.equals(received));
+            }
+        });
+    }
+
+    /**
+     * Test for check, that SendOrdered work in our thread pool. 2 node in topology.
+     */
+    public void testSendOrderedDefaultMode2Node() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        final List<String> msgs = orderedMsg();
+
+        sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
+            @Override public void apply(List<String> received, List<Thread> threads) {
+                assertFalse(threads.contains(Thread.currentThread()));
+                assertTrue(msgs.equals(received));
+            }
+        });
+    }
+
+    /**
+     * Test for check, that SendOrdered work in our thread pool. 2 node in topology.
+     */
+    public void testSendOrderedAsyncMode2Node() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        final List<String> msgs = orderedMsg();
+
+        sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
+            @Override public void apply(List<String> received, List<Thread> threads) {
+                assertFalse(threads.contains(Thread.currentThread()));
+                assertTrue(msgs.equals(received));
+            }
+        });
+    }
+
+    /**
      * @param igniteMsg Ignite message.
      * @param msgStr    Message string.
      * @param cls       Callback for compare result.
@@ -171,4 +238,82 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
 
         cls.apply(val.get(), thread.get());
     }
+
+    /**
+     * @param ignite2 Ignite 2.
+     * @param igniteMsg Ignite message.
+     * @param msgs messages for send.
+     * @param cls  Callback for compare result.
+     */
+    private void sendOrderedWith2Node(
+            final Ignite ignite2,
+            final IgniteMessaging igniteMsg,
+            final List<String> msgs,
+            final IgniteBiInClosure<List<String>,List<Thread>> cls
+    ) throws Exception {
+
+        final CountDownLatch latch = new CountDownLatch(msgs.size());
+
+        final List<String> received = Lists.newArrayList();
+
+        ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+            @Override public boolean apply(UUID uuid, String msg) {
+                received.add(msg);
+                latch.countDown();
+                return true;
+            }
+        });
+
+        sendOrdered(igniteMsg, msgs, cls);
+
+        latch.await();
+
+        assertTrue(msgs.equals(received));
+
+    }
+
+    /**
+     * @param igniteMsg Ignite message.
+     * @param msgs  messages for send.
+     * @param cls Callback for compare result.
+     */
+    private void sendOrdered(
+            final IgniteMessaging igniteMsg,
+            final List<String> msgs,
+            final IgniteBiInClosure<List<String>,List<Thread>> cls
+    ) throws InterruptedException {
+
+        final CountDownLatch latch = new CountDownLatch(msgs.size());
+
+        final List<String> received = Lists.newArrayList();
+        final List<Thread> threads = Lists.newArrayList();
+
+        for (String msg : msgs)
+            igniteMsg.sendOrdered(TOPIC, msg, 1000);
+
+        igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+            @Override public boolean apply(UUID uuid, String s) {
+                received.add(s);
+                threads.add(Thread.currentThread());
+                latch.countDown();
+                return true;
+            }
+        });
+
+        latch.await();
+
+        cls.apply(received, threads);
+    }
+
+    /**
+     * @return List ordered messages
+     */
+    private List<String> orderedMsg() {
+        final List<String> msgs = Lists.newArrayList();
+
+        for (int i = 0; i < 1000; i++)
+            msgs.add("" + i);
+
+        return msgs;
+    }
 }