You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/14 13:11:47 UTC
[28/34] ignite git commit: ignite-3727 added test for multi-thread
sendOrdered
ignite-3727 added test for multi-thread sendOrdered
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9ed5c65
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9ed5c65
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9ed5c65
Branch: refs/heads/ignite-3727-2
Commit: c9ed5c6583e803110a34fc0126ae4e5af56331b1
Parents: de59444
Author: DmitriyGovorukhin <dg...@gridgain.com>
Authored: Wed Sep 14 12:58:20 2016 +0300
Committer: DmitriyGovorukhin <dg...@gridgain.com>
Committed: Wed Sep 14 12:58:20 2016 +0300
----------------------------------------------------------------------
.../messaging/IgniteMessagingSendAsyncTest.java | 191 ++++++++++++++++++-
1 file changed, 183 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ed5c65/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
index 4cfff38..73baeba 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
@@ -18,16 +18,20 @@
package org.apache.ignite.messaging;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
import org.junit.Assert;
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@@ -45,6 +49,9 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
*/
private final String msgStr = "message";
+ /** Count threads for multi-thread test */
+ private final int threads = 10;
+
/**
* {@inheritDoc}
*/
@@ -183,6 +190,174 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
}
/**
+ */
+ public void testSendOrderedDefaultModeMultiThreads() throws Exception {
+ Ignite ignite = startGrid(1);
+
+ sendOrderedMultiThreads(ignite.message());
+ }
+
+ /**
+ */
+ public void testSendOrderedAsyncModeMultiThreads() throws Exception {
+ Ignite ignite = startGrid(1);
+
+ sendOrderedMultiThreads(ignite.message().withAsync());
+ }
+
+ /**
+ */
+ public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message());
+ }
+
+ /**
+ */
+ public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception {
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
+
+ sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message().withAsync());
+ }
+
+ /**
+ * @param ignite2 Ignite 2.
+ * @param ignMsg IgniteMessage.
+ */
+ private void sendOrderedMultiThreadsWith2Node(
+ final Ignite ignite2,
+ final IgniteMessaging ignMsg
+ ) throws InterruptedException {
+ final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
+ final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
+
+ final List<String> msgs = orderedMsg();
+
+ sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs);
+
+ }
+
+
+ /**
+ * @param ignMsg IgniteMessage.
+ */
+ private void sendOrderedMultiThreads(
+ final IgniteMessaging ignMsg
+ ) throws InterruptedException {
+ final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
+ final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
+
+ final List<String> msgs = orderedMsg();
+
+ sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
+
+ }
+
+ /**
+ * @param ignite2 Ignite 2.
+ * @param ignMsg Ignite for send message.
+ * @param expMsg Expected messages map.
+ * @param actlMsg Actual message map.
+ * @param msgs List msgs.
+ */
+ private void sendOrderedMultiThreadsWith2Node(
+ final Ignite ignite2,
+ final IgniteMessaging ignMsg,
+ final ConcurrentMap<String, List<String>> expMsg,
+ final ConcurrentMap<String, List<String>> actlMsg,
+ final List<String> msgs
+ ) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(threads * msgs.size());
+
+ final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap();
+
+ ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
+ @Override public boolean apply(UUID uuid, Message msg) {
+ actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
+ actlMsgNode2.get(msg.threadName).add(msg.message);
+ latch.countDown();
+ return true;
+ }
+ });
+
+ sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
+
+ latch.await();
+
+ assertEquals(expMsg.size(), actlMsgNode2.size());
+
+ for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
+ assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue()));
+ }
+
+ /**
+ * @param ignMsg Ignite for send message.
+ * @param expMsg Expected messages map.
+ * @param actlMsg Actual message map.
+ * @param msgs List msgs.
+ */
+ private void sendOrderedMultiThreads(
+ final IgniteMessaging ignMsg,
+ final ConcurrentMap<String, List<String>> expMsg,
+ final ConcurrentMap<String, List<String>> actlMsg,
+ final List<String> msgs
+ ) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(threads * msgs.size());
+
+ ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
+ @Override public boolean apply(UUID uuid, Message msg) {
+ actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
+ actlMsg.get(msg.threadName).add(msg.message);
+ latch.countDown();
+ return true;
+ }
+ });
+
+ for (int i = 0; i < threads; i++)
+ new Thread(new Runnable() {
+ @Override public void run() {
+ String thdName = Thread.currentThread().getName();
+ List<String> exp = Lists.newArrayList();
+ expMsg.put(thdName, exp);
+
+ for (String msg : msgs) {
+ exp.add(msg);
+ ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000);
+ }
+
+ }
+ }).start();
+
+ latch.await();
+
+ assertEquals(expMsg.size(), actlMsg.size());
+
+ for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
+ assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue()));
+ }
+
+ /**
+ */
+ private class Message implements Serializable{
+ /** Thread name. */
+ private String threadName;
+ /** Message. */
+ private String message;
+
+ /**
+ * @param threadName Thread name.
+ * @param msg Message.
+ */
+ private Message(String threadName, String msg) {
+ this.threadName = threadName;
+ this.message = msg;
+ }
+ }
+
+ /**
* @param igniteMsg Ignite message.
* @param msgStr Message string.
* @param cls Callback for compare result.
@@ -277,22 +452,22 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
* @param msgs messages for send.
* @param cls Callback for compare result.
*/
- private void sendOrdered(
+ private<T> void sendOrdered(
final IgniteMessaging igniteMsg,
- final List<String> msgs,
- final IgniteBiInClosure<List<String>,List<Thread>> cls
+ final List<T> msgs,
+ final IgniteBiInClosure<List<T>,List<Thread>> cls
) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(msgs.size());
- final List<String> received = Lists.newArrayList();
+ final List<T> received = Lists.newArrayList();
final List<Thread> threads = Lists.newArrayList();
- for (String msg : msgs)
+ for (T msg : msgs)
igniteMsg.sendOrdered(TOPIC, msg, 1000);
- igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
- @Override public boolean apply(UUID uuid, String s) {
+ igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() {
+ @Override public boolean apply(UUID uuid, T s) {
received.add(s);
threads.add(Thread.currentThread());
latch.countDown();
@@ -312,7 +487,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme
final List<String> msgs = Lists.newArrayList();
for (int i = 0; i < 1000; i++)
- msgs.add("" + i);
+ msgs.add("" + ThreadLocalRandom8.current().nextInt());
return msgs;
}