You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/14 13:11:47 UTC

[28/34] ignite git commit: ignite-3727 added test for multi-thread sendOrdered

ignite-3727  added test for multi-thread sendOrdered


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9ed5c65
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9ed5c65
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9ed5c65

Branch: refs/heads/ignite-3727-2
Commit: c9ed5c6583e803110a34fc0126ae4e5af56331b1
Parents: de59444
Author: DmitriyGovorukhin <dg...@gridgain.com>
Authored: Wed Sep 14 12:58:20 2016 +0300
Committer: DmitriyGovorukhin <dg...@gridgain.com>
Committed: Wed Sep 14 12:58:20 2016 +0300

----------------------------------------------------------------------
 .../messaging/IgniteMessagingSendAsyncTest.java | 191 ++++++++++++++++++-
 1 file changed, 183 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ed5c65/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
index 4cfff38..73baeba 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
@@ -18,16 +18,20 @@
 package org.apache.ignite.messaging;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
 import org.junit.Assert;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -45,6 +49,9 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
      */
     private final String msgStr = "message";
 
+    /** Count threads for multi-thread test */
+    private final int threads = 10;
+
     /**
      * {@inheritDoc}
      */
@@ -183,6 +190,174 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
     }
 
     /**
+     */
+    public void testSendOrderedDefaultModeMultiThreads() throws Exception {
+        Ignite ignite = startGrid(1);
+
+        sendOrderedMultiThreads(ignite.message());
+    }
+
+    /**
+     */
+    public void testSendOrderedAsyncModeMultiThreads() throws Exception {
+        Ignite ignite = startGrid(1);
+
+        sendOrderedMultiThreads(ignite.message().withAsync());
+    }
+
+    /**
+     */
+    public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message());
+    }
+
+    /**
+     */
+    public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception {
+        Ignite ignite1 = startGrid(1);
+        Ignite ignite2 = startGrid(2);
+
+        sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message().withAsync());
+    }
+
+    /**
+     * @param ignite2 Ignite 2.
+     * @param ignMsg IgniteMessage.
+     */
+    private void sendOrderedMultiThreadsWith2Node(
+            final Ignite ignite2,
+            final IgniteMessaging ignMsg
+    ) throws InterruptedException {
+        final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
+        final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
+
+        final List<String> msgs = orderedMsg();
+
+        sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs);
+
+    }
+
+
+    /**
+     * @param ignMsg IgniteMessage.
+     */
+    private void sendOrderedMultiThreads(
+            final IgniteMessaging ignMsg
+    ) throws InterruptedException {
+        final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
+        final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
+
+        final List<String> msgs = orderedMsg();
+
+        sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
+
+    }
+
+    /**
+     * @param ignite2 Ignite 2.
+     * @param ignMsg Ignite for send message.
+     * @param expMsg Expected messages map.
+     * @param actlMsg Actual message map.
+     * @param msgs List msgs.
+     */
+    private void sendOrderedMultiThreadsWith2Node(
+            final Ignite ignite2,
+            final IgniteMessaging ignMsg,
+            final ConcurrentMap<String, List<String>> expMsg,
+            final ConcurrentMap<String, List<String>> actlMsg,
+            final List<String> msgs
+    ) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(threads * msgs.size());
+
+        final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap();
+
+        ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
+            @Override public boolean apply(UUID uuid, Message msg) {
+                actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
+                actlMsgNode2.get(msg.threadName).add(msg.message);
+                latch.countDown();
+                return true;
+            }
+        });
+
+        sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
+
+        latch.await();
+
+        assertEquals(expMsg.size(), actlMsgNode2.size());
+
+        for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
+            assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue()));
+    }
+
+    /**
+     * @param ignMsg Ignite for send message.
+     * @param expMsg Expected messages map.
+     * @param actlMsg Actual message map.
+     * @param msgs List msgs.
+     */
+    private void sendOrderedMultiThreads(
+            final IgniteMessaging ignMsg,
+            final ConcurrentMap<String, List<String>> expMsg,
+            final ConcurrentMap<String, List<String>> actlMsg,
+            final List<String> msgs
+    ) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(threads * msgs.size());
+
+        ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
+            @Override public boolean apply(UUID uuid, Message msg) {
+                actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
+                actlMsg.get(msg.threadName).add(msg.message);
+                latch.countDown();
+                return true;
+            }
+        });
+
+        for (int i = 0; i < threads; i++)
+            new Thread(new Runnable() {
+                @Override public void run() {
+                    String thdName = Thread.currentThread().getName();
+                    List<String> exp = Lists.newArrayList();
+                    expMsg.put(thdName, exp);
+
+                    for (String msg : msgs) {
+                        exp.add(msg);
+                        ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000);
+                    }
+
+                }
+            }).start();
+
+        latch.await();
+
+        assertEquals(expMsg.size(), actlMsg.size());
+
+        for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
+            assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue()));
+    }
+
+    /**
+     */
+    private class Message implements Serializable{
+        /** Thread name. */
+        private String threadName;
+        /** Message. */
+        private String message;
+
+        /**
+         * @param threadName Thread name.
+         * @param msg Message.
+         */
+        private Message(String threadName, String msg) {
+            this.threadName = threadName;
+            this.message = msg;
+        }
+    }
+
+    /**
      * @param igniteMsg Ignite message.
      * @param msgStr    Message string.
      * @param cls       Callback for compare result.
@@ -277,22 +452,22 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
      * @param msgs  messages for send.
      * @param cls Callback for compare result.
      */
-    private void sendOrdered(
+    private<T> void sendOrdered(
             final IgniteMessaging igniteMsg,
-            final List<String> msgs,
-            final IgniteBiInClosure<List<String>,List<Thread>> cls
+            final List<T> msgs,
+            final IgniteBiInClosure<List<T>,List<Thread>> cls
     ) throws InterruptedException {
 
         final CountDownLatch latch = new CountDownLatch(msgs.size());
 
-        final List<String> received = Lists.newArrayList();
+        final List<T> received = Lists.newArrayList();
         final List<Thread> threads = Lists.newArrayList();
 
-        for (String msg : msgs)
+        for (T msg : msgs)
             igniteMsg.sendOrdered(TOPIC, msg, 1000);
 
-        igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
-            @Override public boolean apply(UUID uuid, String s) {
+        igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() {
+            @Override public boolean apply(UUID uuid, T s) {
                 received.add(s);
                 threads.add(Thread.currentThread());
                 latch.countDown();
@@ -312,7 +487,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
         final List<String> msgs = Lists.newArrayList();
 
         for (int i = 0; i < 1000; i++)
-            msgs.add("" + i);
+            msgs.add("" + ThreadLocalRandom8.current().nextInt());
 
         return msgs;
     }