You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 08:27:57 UTC
incubator-ignite git commit: ignite-471-2: disable a test for
portable marshaller
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-471-2 4007a4303 -> d64185b82
ignite-471-2: disable a test for portable marshaller
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d64185b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d64185b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d64185b8
Branch: refs/heads/ignite-471-2
Commit: d64185b82640d994b263a9558909a7b86546743c
Parents: 4007a43
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jun 11 09:27:46 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jun 11 09:27:46 2015 +0300
----------------------------------------------------------------------
.../ignite/messaging/GridMessagingSelfTest.java | 557 ++++++++++---------
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +-
2 files changed, 295 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d64185b8/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index fb4ac8c..455bc8c 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -75,6 +75,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
/** Shared IP finder. */
private final transient TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ /** */
+ protected static CountDownLatch rcvLatch;
+
/**
* A test message topic.
*/
@@ -180,7 +183,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
-
+
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(ipFinder);
@@ -196,9 +199,34 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
* @throws Exception If error occurs.
*/
public void testSendReceiveMessage() throws Exception {
- ReceiveRemoteMessageListener<UUID, Object> list = new ReceiveRemoteMessageListener<>(ignite2, 3);
+ final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
+
+ final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
+
+ final CountDownLatch rcvLatch = new CountDownLatch(3);
+
+ ignite1.message().localListen(null, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
- ignite1.message().localListen(null, list);
+ if (!nodeId.equals(ignite2.cluster().localNode().id())) {
+ log.error("Unexpected sender node: " + nodeId);
+
+ error.set(true);
+
+ return false;
+ }
+
+ rcvMsgs.add(msg);
+
+ return true;
+ }
+ finally {
+ rcvLatch.countDown();
+ }
+ }
+ });
ClusterGroup rNode1 = ignite2.cluster().forRemotes();
@@ -206,13 +234,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
message(rNode1).send(null, MSG_2);
message(rNode1).send(null, MSG_3);
- assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS));
+ assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
- assertFalse(list.error.get());
+ assertFalse(error.get());
- assertTrue(list.rcvMsgs.contains(MSG_1));
- assertTrue(list.rcvMsgs.contains(MSG_2));
- assertTrue(list.rcvMsgs.contains(MSG_3));
+ assertTrue(rcvMsgs.contains(MSG_1));
+ assertTrue(rcvMsgs.contains(MSG_2));
+ assertTrue(rcvMsgs.contains(MSG_3));
}
/**
@@ -579,9 +607,24 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
* @throws Exception If error occurs.
*/
public void testRemoteListen() throws Exception {
- MessageReceiverListener list = new MessageReceiverListener();
+ final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
- ignite2.message().remoteListen(null, list);
+ rcvLatch = new CountDownLatch(4);
+
+ ignite2.message().remoteListen(null, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ rcvMsgs.add(msg);
+
+ return true;
+ }
+ finally {
+ rcvLatch.countDown();
+ }
+ }
+ });
ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
@@ -589,11 +632,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
message(prj2).send(null, MSG_2);
message(ignite2.cluster().forLocal()).send(null, MSG_3);
- assertFalse(list.rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message.
+ assertFalse(rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message.
- assertTrue(list.rcvMsgs.contains(MSG_1));
- assertTrue(list.rcvMsgs.contains(MSG_2));
- assertTrue(list.rcvMsgs.contains(MSG_3));
+ assertTrue(rcvMsgs.contains(MSG_1));
+ assertTrue(rcvMsgs.contains(MSG_2));
+ assertTrue(rcvMsgs.contains(MSG_3));
}
/**
@@ -601,19 +644,45 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
*/
@SuppressWarnings("TooBroadScope")
public void testStopRemoteListen() throws Exception {
- final IncrementTestListener list1 = new IncrementTestListener();
- final IncrementTestListener list2 = new IncrementTestListener();
- final IncrementTestListener list3 = new IncrementTestListener();
+ final AtomicInteger msgCnt1 = new AtomicInteger();
+
+ final AtomicInteger msgCnt2 = new AtomicInteger();
+
+ final AtomicInteger msgCnt3 = new AtomicInteger();
final String topic1 = null;
final String topic2 = "top2";
final String topic3 = "top3";
- UUID id1 = ignite2.message().remoteListen(topic1, list1);
+ UUID id1 = ignite2.message().remoteListen(topic1, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ System.out.println(Thread.currentThread().getName() + " Listener1 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
- UUID id2 = ignite2.message().remoteListen(topic2, list2);
+ msgCnt1.incrementAndGet();
- UUID id3 = ignite2.message().remoteListen(topic3, list3);
+ return true;
+ }
+ });
+
+ UUID id2 = ignite2.message().remoteListen(topic2, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ System.out.println(Thread.currentThread().getName() + " Listener2 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ msgCnt2.incrementAndGet();
+
+ return true;
+ }
+ });
+
+ UUID id3 = ignite2.message().remoteListen(topic3, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ System.out.println(Thread.currentThread().getName() + " Listener3 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ msgCnt3.incrementAndGet();
+
+ return true;
+ }
+ });
message(ignite1.cluster().forRemotes()).send(topic1, "msg1-1");
message(ignite1.cluster().forRemotes()).send(topic2, "msg1-2");
@@ -621,13 +690,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
- return list1.msgCnt.get() > 0 && list2.msgCnt.get() > 0 && list3.msgCnt.get() > 0;
+ return msgCnt1.get() > 0 && msgCnt2.get() > 0 && msgCnt3.get() > 0;
}
}, 5000);
- assertEquals(1, list1.msgCnt.get());
- assertEquals(1, list2.msgCnt.get());
- assertEquals(1, list3.msgCnt.get());
+ assertEquals(1, msgCnt1.get());
+ assertEquals(1, msgCnt2.get());
+ assertEquals(1, msgCnt3.get());
ignite2.message().stopRemoteListen(id2);
@@ -637,13 +706,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
- return list1.msgCnt.get() > 1 && list3.msgCnt.get() > 1;
+ return msgCnt1.get() > 1 && msgCnt3.get() > 1;
}
}, 5000);
- assertEquals(2, list1.msgCnt.get());
- assertEquals(1, list2.msgCnt.get());
- assertEquals(2, list3.msgCnt.get());
+ assertEquals(2, msgCnt1.get());
+ assertEquals(1, msgCnt2.get());
+ assertEquals(2, msgCnt3.get());
ignite2.message().stopRemoteListen(id2); // Try remove one more time.
@@ -656,9 +725,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
U.sleep(1000);
- assertEquals(2, list1.msgCnt.get());
- assertEquals(1, list2.msgCnt.get());
- assertEquals(2, list3.msgCnt.get());
+ assertEquals(2, msgCnt1.get());
+ assertEquals(1, msgCnt2.get());
+ assertEquals(2, msgCnt3.get());
}
/**
@@ -673,21 +742,46 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
new TestMessage(MSG_2, 3000),
new TestMessage(MSG_3));
- ReceiveRemoteMessageListener receiver = new ReceiveRemoteMessageListener<>(ignite1, 3);
+ final Collection<Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+
+ final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
+
+ rcvLatch = new CountDownLatch(3);
+
+ ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+ log.error("Unexpected sender node: " + nodeId);
+
+ error.set(true);
+
+ return false;
+ }
- ignite2.message().remoteListen(S_TOPIC_1, receiver);
+ rcvMsgs.add(msg);
+
+ return true;
+ }
+ finally {
+ rcvLatch.countDown();
+ }
+ }
+ });
ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
for (TestMessage msg : msgs)
message(prj2).sendOrdered(S_TOPIC_1, msg, 15000);
- assertTrue(receiver.rcvLatch.await(6, TimeUnit.SECONDS));
+ assertTrue(rcvLatch.await(6, TimeUnit.SECONDS));
- assertFalse(receiver.error.get());
+ assertFalse(error.get());
//noinspection AssertEqualsBetweenInconvertibleTypes
- assertEquals(msgs, Arrays.asList(receiver.rcvMsgs.toArray()));
+ assertEquals(msgs, Arrays.asList(rcvMsgs.toArray()));
}
/**
@@ -697,17 +791,122 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
* @throws Exception If error occurs.
*/
public void testRemoteListenWithIntTopic() throws Exception {
- ListenWithIntTopic topList1 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_1, MSG_1);
+ final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
+
+ final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
+
+ rcvLatch = new CountDownLatch(3);
+
+ ignite2.message().remoteListen(I_TOPIC_1, new P2<UUID, Object>() {
+ @IgniteInstanceResource
+ private transient Ignite g;
+
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ assertEquals(ignite2, g);
+
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
+ ", topic=" + I_TOPIC_1 + ']');
+
+ if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+ log.error("Unexpected sender node: " + nodeId);
+
+ error.set(true);
+
+ return false;
+ }
+
+ if (!MSG_1.equals(msg)) {
+ log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_1);
+
+ error.set(true);
+
+ return false;
+ }
+
+ rcvMsgs.add(msg);
+
+ return true;
+ }
+ finally {
+ rcvLatch.countDown();
+ }
+ }
+ });
+
+ ignite2.message().remoteListen(I_TOPIC_2, new P2<UUID, Object>() {
+ @IgniteInstanceResource
+ private transient Ignite g;
+
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ assertEquals(ignite2, g);
+
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
+ ", topic=" + I_TOPIC_2 + ']');
+
+ if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+ log.error("Unexpected sender node: " + nodeId);
+
+ error.set(true);
+
+ return false;
+ }
+
+ if (!MSG_2.equals(msg)) {
+ log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_2);
+
+ error.set(true);
+
+ return false;
+ }
+
+ rcvMsgs.add(msg);
+
+ return true;
+ }
+ finally {
+ rcvLatch.countDown();
+ }
+ }
+ });
+
+ ignite2.message().remoteListen(null, new P2<UUID, Object>() {
+ @IgniteInstanceResource
+ private transient Ignite g;
+
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ assertEquals(ignite2, g);
+
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
+ ", topic=default]");
+
+ if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+ log.error("Unexpected sender node: " + nodeId);
- ListenWithIntTopic topList2 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_2, MSG_2);
+ error.set(true);
- ListenWithIntTopic topList3 = new ListenWithIntTopic(ignite1, ignite2, null, MSG_3);
+ return false;
+ }
- ignite2.message().remoteListen(I_TOPIC_1, topList1);
+ if (!MSG_3.equals(msg)) {
+ log.error("Unexpected message " + msg + " for topic: default");
- ignite2.message().remoteListen(I_TOPIC_2, topList2);
+ error.set(true);
- ignite2.message().remoteListen(null, topList3);
+ return false;
+ }
+
+ rcvMsgs.add(msg);
+
+ return true;
+ }
+ finally {
+ rcvLatch.countDown();
+ }
+ }
+ });
ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
@@ -715,17 +914,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
message(prj2).send(I_TOPIC_2, MSG_2);
message(prj2).send(null, MSG_3);
- assertTrue(topList1.rcvLatch.await(3, TimeUnit.SECONDS));
- assertTrue(topList2.rcvLatch.await(3, TimeUnit.SECONDS));
- assertTrue(topList3.rcvLatch.await(3, TimeUnit.SECONDS));
+ assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
- assertFalse(topList1.error.get());
- assertFalse(topList2.error.get());
- assertFalse(topList3.error.get());
+ assertFalse(error.get());
- assertTrue(topList1.rcvMsgs.contains(MSG_1));
- assertTrue(topList2.rcvMsgs.contains(MSG_2));
- assertTrue(topList3.rcvMsgs.contains(MSG_3));
+ assertTrue(rcvMsgs.contains(MSG_1));
+ assertTrue(rcvMsgs.contains(MSG_2));
+ assertTrue(rcvMsgs.contains(MSG_3));
}
/**
@@ -741,15 +936,36 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
Class rcCls = extLdr.loadClass(EXT_RESOURCE_CLS_NAME);
- ReceiveRemoteMessageListener list = new ReceiveRemoteMessageListener(ignite1, 1);
+ final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
+
+ final CountDownLatch rcvLatch = new CountDownLatch(1);
- ignite2.message().remoteListen(S_TOPIC_1, list);
+ ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+ log.error("Unexpected sender node: " + nodeId);
+
+ error.set(true);
+
+ return false;
+ }
+
+ return true;
+ }
+ finally {
+ rcvLatch.countDown();
+ }
+ }
+ });
message(ignite1.cluster().forRemotes()).send(S_TOPIC_1, Collections.singleton(rcCls.newInstance()));
- assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS));
+ assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
- assertFalse(list.error.get());
+ assertFalse(error.get());
}
/**
@@ -796,6 +1012,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
* @throws Exception If failed.
*/
public void testAsync() throws Exception {
+ final AtomicInteger msgCnt = new AtomicInteger();
+
assertFalse(ignite2.message().isAsync());
final IgniteMessaging msg = ignite2.message().withAsync();
@@ -804,8 +1022,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
assertFalse(ignite2.message().isAsync());
- final IncrementTestListener list = new IncrementTestListener();
-
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
msg.future();
@@ -816,7 +1032,16 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
final String topic = "topic";
- UUID id = msg.remoteListen(topic, list);
+ UUID id = msg.remoteListen(topic, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ System.out.println(Thread.currentThread().getName() +
+ " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ msgCnt.incrementAndGet();
+
+ return true;
+ }
+ });
Assert.assertNull(id);
@@ -840,11 +1065,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
- return list.msgCnt.get() > 0;
+ return msgCnt.get() > 0;
}
}, 5000);
- assertEquals(1, list.msgCnt.get());
+ assertEquals(1, msgCnt.get());
msg.stopRemoteListen(id);
@@ -866,7 +1091,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
U.sleep(1000);
- assertEquals(1, list.msgCnt.get());
+ assertEquals(1, msgCnt.get());
}
/**
@@ -890,7 +1115,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
/**
* @param expOldestIgnite Expected oldest ignite.
- * @throws InterruptedException If interrupted.
*/
private void remoteListenForOldest(Ignite expOldestIgnite) throws InterruptedException {
ClusterGroup grp = ignite1.cluster().forOldest();
@@ -898,213 +1122,20 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
assertEquals(1, grp.nodes().size());
assertEquals(expOldestIgnite.cluster().localNode().id(), grp.node().id());
- ignite1.message(grp).remoteListen(null, new ListenForOldestListener<UUID, Object>());
-
- ignite1.message().send(null, MSG_1);
-
- Thread.sleep(3000);
-
- assertEquals(1, MSG_CNT.get());
- }
-
- /**
- *
- */
- private static class IncrementTestListener<UUID, Object> implements P2<UUID, Object> {
- /** */
- final AtomicInteger msgCnt = new AtomicInteger();
-
- /** */
- @LoggerResource
- private transient IgniteLogger log;
-
- /** {@inheritDoc} */
- @Override public boolean apply(UUID nodeId, Object msg) {
- log.info("Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- msgCnt.incrementAndGet();
-
- return true;
- }
- }
-
- /**
- *
- */
- private static class ReceiveRemoteMessageListener<UUID, Object> implements P2<UUID, Object> {
- /** */
- final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
-
- /** */
- final AtomicBoolean error = new AtomicBoolean(false);
-
- /** */
- final CountDownLatch rcvLatch;
-
- /** */
- final Ignite sender;
-
- /** */
- @LoggerResource
- private transient IgniteLogger log;
-
- /**
- * @param sender
- * @param latchCount
- */
- public ReceiveRemoteMessageListener(Ignite sender, int latchCount) {
- this.sender = sender;
- rcvLatch = new CountDownLatch(latchCount);
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(UUID nodeId, Object msg) {
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- if (!nodeId.equals(sender.cluster().localNode().id())) {
- log.info("Unexpected sender node: " + nodeId);
-
- error.set(true);
-
- return false;
- }
-
- rcvMsgs.add(msg);
-
- return true;
- }
- finally {
- rcvLatch.countDown();
- }
- }
- }
-
-
- /**
- *
- */
- private static class ListenForOldestListener<UUID, Object> implements P2<UUID, Object> {
- /** */
- @LoggerResource
- private transient IgniteLogger log;
-
- /** {@inheritDoc} */
- @Override public boolean apply(UUID nodeId, Object msg) {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- MSG_CNT.incrementAndGet();
-
- return true;
- }
- }
-
- /**
- *
- */
- private static class ListenWithIntTopic implements P2<UUID, Object> {
- /** */
- final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
-
- /** */
- final AtomicBoolean error = new AtomicBoolean(false);
-
- /** */
- final CountDownLatch rcvLatch = new CountDownLatch(1);
-
- /** */
- private final Ignite sender;
-
- /** */
- private final Ignite receiver;
-
- /** */
- @IgniteInstanceResource
- private transient Ignite g;
-
- /** */
- @LoggerResource
- private transient IgniteLogger log;
-
- /** */
- final Integer topic;
-
- /** */
- final String message;
-
- /**
- * @param sender
- * @param receiver
- * @param topic
- * @param message
- */
- public ListenWithIntTopic(Ignite sender, Ignite receiver, Integer topic, String message) {
- this.sender = sender;
- this.receiver = receiver;
- this.topic = topic;
- this.message = message;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(UUID nodeId, Object msg) {
- assertEquals(receiver, g);
-
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
- ", topic=" + topic + ']');
-
- if (!nodeId.equals(sender.cluster().localNode().id())) {
- log.error("Unexpected sender node: " + nodeId);
-
- error.set(true);
-
- return false;
- }
-
- if (!message.equals(msg)) {
- log.error("Unexpected message " + msg + " for topic: " + topic);
-
- error.set(true);
-
- return false;
- }
+ ignite1.message(grp).remoteListen(null, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ System.out.println("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
- rcvMsgs.add(msg);
+ MSG_CNT.incrementAndGet();
return true;
}
- finally {
- rcvLatch.countDown();
- }
- }
- }
-
- /**
- *
- */
- private static class MessageReceiverListener<UUID, Object> implements P2<UUID, Object> {
- /** */
- final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
-
- /** */
- final CountDownLatch rcvLatch = new CountDownLatch(4);
-
- /** */
- @LoggerResource
- private transient IgniteLogger log;
+ });
- /** {@inheritDoc} */
- @Override public boolean apply(UUID nodeId, Object msg) {
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+ ignite1.message().send(null, MSG_1);
- rcvMsgs.add(msg);
+ Thread.sleep(3000);
- return true;
- }
- finally {
- rcvLatch.countDown();
- }
- }
+ assertEquals(1, MSG_CNT.get());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d64185b8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 8c061be..949b76d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -66,7 +66,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridSelfTest.class));
GridTestUtils.addTestIfNeeded(suite, GridProjectionSelfTest.class, ignoredTests);
- suite.addTest(new TestSuite(GridMessagingSelfTest.class));
+ GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests);
suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
GridTestUtils.addTestIfNeeded(suite, GridMessagingNoPeerClassLoadingSelfTest.class, ignoredTests);