You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/20 12:02:35 UTC
[18/19] incubator-ignite git commit: ignite-471-2: GridQueryProcessor
fix and other test suites fixes
ignite-471-2: GridQueryProcessor fix and other test suites fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/490a2526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/490a2526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/490a2526
Branch: refs/heads/ignite-471-2
Commit: 490a2526906e7d22b1e20da2436f664393dc65f5
Parents: 379c73f
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed May 20 12:38:23 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed May 20 12:38:23 2015 +0300
----------------------------------------------------------------------
.../ignite/messaging/GridMessagingSelfTest.java | 559 +++++++++----------
.../ignite/testframework/GridTestUtils.java | 16 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 26 +-
.../IgniteMarshallerSelfTestSuite.java | 28 +-
4 files changed, 317 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/490a2526/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index c033750..36c2a9f 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
@@ -182,8 +181,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- ((OptimizedMarshaller)cfg.getMarshaller()).setRequireSerializable(false);
-
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(ipFinder);
@@ -199,34 +196,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
* @throws Exception If error occurs.
*/
public void testSendReceiveMessage() throws Exception {
- final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
-
- final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
-
- final CountDownLatch rcvLatch = new CountDownLatch(3);
-
- ignite1.message().localListen(null, new P2<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- if (!nodeId.equals(ignite2.cluster().localNode().id())) {
- log.error("Unexpected sender node: " + nodeId);
+ ReceiveRemoteMessageListener<UUID, Object> list = new ReceiveRemoteMessageListener<>(ignite2, 3);
- error.set(true);
-
- return false;
- }
-
- rcvMsgs.add(msg);
-
- return true;
- }
- finally {
- rcvLatch.countDown();
- }
- }
- });
+ ignite1.message().localListen(null, list);
ClusterGroup rNode1 = ignite2.cluster().forRemotes();
@@ -234,13 +206,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
message(rNode1).send(null, MSG_2);
message(rNode1).send(null, MSG_3);
- assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
+ assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS));
- assertFalse(error.get());
+ assertFalse(list.error.get());
- assertTrue(rcvMsgs.contains(MSG_1));
- assertTrue(rcvMsgs.contains(MSG_2));
- assertTrue(rcvMsgs.contains(MSG_3));
+ assertTrue(list.rcvMsgs.contains(MSG_1));
+ assertTrue(list.rcvMsgs.contains(MSG_2));
+ assertTrue(list.rcvMsgs.contains(MSG_3));
}
/**
@@ -607,24 +579,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
* @throws Exception If error occurs.
*/
public void testRemoteListen() throws Exception {
- final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
-
- final CountDownLatch rcvLatch = new CountDownLatch(4);
+ MessageReceiverListener list = new MessageReceiverListener();
- ignite2.message().remoteListen(null, new P2<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- rcvMsgs.add(msg);
-
- return true;
- }
- finally {
- rcvLatch.countDown();
- }
- }
- });
+ ignite2.message().remoteListen(null, list);
ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
@@ -632,11 +589,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
message(prj2).send(null, MSG_2);
message(ignite2.cluster().forLocal()).send(null, MSG_3);
- assertFalse(rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message.
+ assertFalse(list.rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message.
- assertTrue(rcvMsgs.contains(MSG_1));
- assertTrue(rcvMsgs.contains(MSG_2));
- assertTrue(rcvMsgs.contains(MSG_3));
+ assertTrue(list.rcvMsgs.contains(MSG_1));
+ assertTrue(list.rcvMsgs.contains(MSG_2));
+ assertTrue(list.rcvMsgs.contains(MSG_3));
}
/**
@@ -644,45 +601,19 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
*/
@SuppressWarnings("TooBroadScope")
public void testStopRemoteListen() throws Exception {
- final AtomicInteger msgCnt1 = new AtomicInteger();
-
- final AtomicInteger msgCnt2 = new AtomicInteger();
-
- final AtomicInteger msgCnt3 = new AtomicInteger();
+ final IncrementTestListener list1 = new IncrementTestListener();
+ final IncrementTestListener list2 = new IncrementTestListener();
+ final IncrementTestListener list3 = new IncrementTestListener();
final String topic1 = null;
final String topic2 = "top2";
final String topic3 = "top3";
- UUID id1 = ignite2.message().remoteListen(topic1, new P2<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- System.out.println(Thread.currentThread().getName() + " Listener1 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- msgCnt1.incrementAndGet();
-
- return true;
- }
- });
-
- UUID id2 = ignite2.message().remoteListen(topic2, new P2<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- System.out.println(Thread.currentThread().getName() + " Listener2 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- msgCnt2.incrementAndGet();
-
- return true;
- }
- });
-
- UUID id3 = ignite2.message().remoteListen(topic3, new P2<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- System.out.println(Thread.currentThread().getName() + " Listener3 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+ UUID id1 = ignite2.message().remoteListen(topic1, list1);
- msgCnt3.incrementAndGet();
+ UUID id2 = ignite2.message().remoteListen(topic2, list2);
- return true;
- }
- });
+ UUID id3 = ignite2.message().remoteListen(topic3, list3);
message(ignite1.cluster().forRemotes()).send(topic1, "msg1-1");
message(ignite1.cluster().forRemotes()).send(topic2, "msg1-2");
@@ -690,13 +621,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
- return msgCnt1.get() > 0 && msgCnt2.get() > 0 && msgCnt3.get() > 0;
+ return list1.msgCnt.get() > 0 && list2.msgCnt.get() > 0 && list3.msgCnt.get() > 0;
}
}, 5000);
- assertEquals(1, msgCnt1.get());
- assertEquals(1, msgCnt2.get());
- assertEquals(1, msgCnt3.get());
+ assertEquals(1, list1.msgCnt.get());
+ assertEquals(1, list2.msgCnt.get());
+ assertEquals(1, list3.msgCnt.get());
ignite2.message().stopRemoteListen(id2);
@@ -706,13 +637,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
- return msgCnt1.get() > 1 && msgCnt3.get() > 1;
+ return list1.msgCnt.get() > 1 && list3.msgCnt.get() > 1;
}
}, 5000);
- assertEquals(2, msgCnt1.get());
- assertEquals(1, msgCnt2.get());
- assertEquals(2, msgCnt3.get());
+ assertEquals(2, list1.msgCnt.get());
+ assertEquals(1, list2.msgCnt.get());
+ assertEquals(2, list3.msgCnt.get());
ignite2.message().stopRemoteListen(id2); // Try remove one more time.
@@ -725,9 +656,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
U.sleep(1000);
- assertEquals(2, msgCnt1.get());
- assertEquals(1, msgCnt2.get());
- assertEquals(2, msgCnt3.get());
+ assertEquals(2, list1.msgCnt.get());
+ assertEquals(1, list2.msgCnt.get());
+ assertEquals(2, list3.msgCnt.get());
}
/**
@@ -742,46 +673,21 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
new TestMessage(MSG_2, 3000),
new TestMessage(MSG_3));
- final Collection<Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+ ReceiveRemoteMessageListener receiver = new ReceiveRemoteMessageListener<>(ignite1, 3);
- final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
-
- final CountDownLatch rcvLatch = new CountDownLatch(3);
-
- ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- if (!nodeId.equals(ignite1.cluster().localNode().id())) {
- log.error("Unexpected sender node: " + nodeId);
-
- error.set(true);
-
- return false;
- }
-
- rcvMsgs.add(msg);
-
- return true;
- }
- finally {
- rcvLatch.countDown();
- }
- }
- });
+ ignite2.message().remoteListen(S_TOPIC_1, receiver);
ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
for (TestMessage msg : msgs)
message(prj2).sendOrdered(S_TOPIC_1, msg, 15000);
- assertTrue(rcvLatch.await(6, TimeUnit.SECONDS));
+ assertTrue(receiver.rcvLatch.await(6, TimeUnit.SECONDS));
- assertFalse(error.get());
+ assertFalse(receiver.error.get());
//noinspection AssertEqualsBetweenInconvertibleTypes
- assertEquals(msgs, Arrays.asList(rcvMsgs.toArray()));
+ assertEquals(msgs, Arrays.asList(receiver.rcvMsgs.toArray()));
}
/**
@@ -791,122 +697,17 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
* @throws Exception If error occurs.
*/
public void testRemoteListenWithIntTopic() throws Exception {
- final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
-
- final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
-
- final CountDownLatch rcvLatch = new CountDownLatch(3);
-
- ignite2.message().remoteListen(I_TOPIC_1, new P2<UUID, Object>() {
- @IgniteInstanceResource
- private transient Ignite g;
-
- @Override public boolean apply(UUID nodeId, Object msg) {
- assertEquals(ignite2, g);
-
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
- ", topic=" + I_TOPIC_1 + ']');
-
- if (!nodeId.equals(ignite1.cluster().localNode().id())) {
- log.error("Unexpected sender node: " + nodeId);
-
- error.set(true);
-
- return false;
- }
-
- if (!MSG_1.equals(msg)) {
- log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_1);
-
- error.set(true);
-
- return false;
- }
-
- rcvMsgs.add(msg);
-
- return true;
- }
- finally {
- rcvLatch.countDown();
- }
- }
- });
-
- ignite2.message().remoteListen(I_TOPIC_2, new P2<UUID, Object>() {
- @IgniteInstanceResource
- private transient Ignite g;
-
- @Override public boolean apply(UUID nodeId, Object msg) {
- assertEquals(ignite2, g);
-
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
- ", topic=" + I_TOPIC_2 + ']');
-
- if (!nodeId.equals(ignite1.cluster().localNode().id())) {
- log.error("Unexpected sender node: " + nodeId);
-
- error.set(true);
-
- return false;
- }
-
- if (!MSG_2.equals(msg)) {
- log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_2);
-
- error.set(true);
-
- return false;
- }
-
- rcvMsgs.add(msg);
-
- return true;
- }
- finally {
- rcvLatch.countDown();
- }
- }
- });
-
- ignite2.message().remoteListen(null, new P2<UUID, Object>() {
- @IgniteInstanceResource
- private transient Ignite g;
-
- @Override public boolean apply(UUID nodeId, Object msg) {
- assertEquals(ignite2, g);
-
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
- ", topic=default]");
+ ListenWithIntTopic topList1 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_1, MSG_1);
- if (!nodeId.equals(ignite1.cluster().localNode().id())) {
- log.error("Unexpected sender node: " + nodeId);
-
- error.set(true);
-
- return false;
- }
-
- if (!MSG_3.equals(msg)) {
- log.error("Unexpected message " + msg + " for topic: default");
+ ListenWithIntTopic topList2 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_2, MSG_2);
- error.set(true);
+ ListenWithIntTopic topList3 = new ListenWithIntTopic(ignite1, ignite2, null, MSG_3);
- return false;
- }
+ ignite2.message().remoteListen(I_TOPIC_1, topList1);
- rcvMsgs.add(msg);
+ ignite2.message().remoteListen(I_TOPIC_2, topList2);
- return true;
- }
- finally {
- rcvLatch.countDown();
- }
- }
- });
+ ignite2.message().remoteListen(null, topList3);
ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
@@ -914,13 +715,17 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
message(prj2).send(I_TOPIC_2, MSG_2);
message(prj2).send(null, MSG_3);
- assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
+ assertTrue(topList1.rcvLatch.await(3, TimeUnit.SECONDS));
+ assertTrue(topList2.rcvLatch.await(3, TimeUnit.SECONDS));
+ assertTrue(topList3.rcvLatch.await(3, TimeUnit.SECONDS));
- assertFalse(error.get());
+ assertFalse(topList1.error.get());
+ assertFalse(topList2.error.get());
+ assertFalse(topList3.error.get());
- assertTrue(rcvMsgs.contains(MSG_1));
- assertTrue(rcvMsgs.contains(MSG_2));
- assertTrue(rcvMsgs.contains(MSG_3));
+ assertTrue(topList1.rcvMsgs.contains(MSG_1));
+ assertTrue(topList2.rcvMsgs.contains(MSG_2));
+ assertTrue(topList3.rcvMsgs.contains(MSG_3));
}
/**
@@ -936,36 +741,15 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
Class rcCls = extLdr.loadClass(EXT_RESOURCE_CLS_NAME);
- final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
-
- final CountDownLatch rcvLatch = new CountDownLatch(1);
-
- ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- try {
- log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- if (!nodeId.equals(ignite1.cluster().localNode().id())) {
- log.error("Unexpected sender node: " + nodeId);
+ ReceiveRemoteMessageListener list = new ReceiveRemoteMessageListener(ignite1, 1);
- error.set(true);
-
- return false;
- }
-
- return true;
- }
- finally {
- rcvLatch.countDown();
- }
- }
- });
+ ignite2.message().remoteListen(S_TOPIC_1, list);
message(ignite1.cluster().forRemotes()).send(S_TOPIC_1, Collections.singleton(rcCls.newInstance()));
- assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
+ assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS));
- assertFalse(error.get());
+ assertFalse(list.error.get());
}
/**
@@ -1012,8 +796,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testAsync() throws Exception {
- final AtomicInteger msgCnt = new AtomicInteger();
-
assertFalse(ignite2.message().isAsync());
final IgniteMessaging msg = ignite2.message().withAsync();
@@ -1022,6 +804,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
assertFalse(ignite2.message().isAsync());
+ final IncrementTestListener list = new IncrementTestListener();
+
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
msg.future();
@@ -1032,16 +816,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
final String topic = "topic";
- UUID id = msg.remoteListen(topic, new P2<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- System.out.println(Thread.currentThread().getName() +
- " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
- msgCnt.incrementAndGet();
-
- return true;
- }
- });
+ UUID id = msg.remoteListen(topic, list);
Assert.assertNull(id);
@@ -1065,11 +840,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
- return msgCnt.get() > 0;
+ return list.msgCnt.get() > 0;
}
}, 5000);
- assertEquals(1, msgCnt.get());
+ assertEquals(1, list.msgCnt.get());
msg.stopRemoteListen(id);
@@ -1091,7 +866,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
U.sleep(1000);
- assertEquals(1, msgCnt.get());
+ assertEquals(1, list.msgCnt.get());
}
/**
@@ -1122,20 +897,218 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
assertEquals(1, grp.nodes().size());
assertEquals(expOldestIgnite.cluster().localNode().id(), grp.node().id());
- ignite1.message(grp).remoteListen(null, new P2<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- System.out.println("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+ ignite1.message(grp).remoteListen(null, new ListenForOldestListener<UUID, Object>());
+
+ ignite1.message().send(null, MSG_1);
+
+ Thread.sleep(3000);
+
+ assertEquals(1, MSG_CNT.get());
+ }
+
+ /**
+ *
+ */
+ private static class IncrementTestListener<UUID, Object> implements P2<UUID, Object> {
+ /** */
+ final AtomicInteger msgCnt = new AtomicInteger();
+
+ /** */
+ @LoggerResource
+ private transient IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ log.info(Thread.currentThread().getName() +
+ " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
- MSG_CNT.incrementAndGet();
+ msgCnt.incrementAndGet();
+
+ return true;
+ }
+ }
+
+
+ /**
+ *
+ */
+ private static class ReceiveRemoteMessageListener<UUID, Object> implements P2<UUID, Object> {
+ /** */
+ final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+
+ /** */
+ final AtomicBoolean error = new AtomicBoolean(false);
+
+ /** */
+ final CountDownLatch rcvLatch;
+
+ /** */
+ final Ignite sender;
+
+ /** */
+ @LoggerResource
+ private transient IgniteLogger log;
+
+ /**
+ * @param sender
+ * @param latchCount
+ */
+ public ReceiveRemoteMessageListener(Ignite sender, int latchCount) {
+ this.sender = sender;
+ rcvLatch = new CountDownLatch(latchCount);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ if (!nodeId.equals(sender.cluster().localNode().id())) {
+ log.info("Unexpected sender node: " + nodeId);
+
+ error.set(true);
+
+ return false;
+ }
+
+ rcvMsgs.add(msg);
return true;
}
- });
+ finally {
+ rcvLatch.countDown();
+ }
+ }
+ }
- ignite1.message().send(null, MSG_1);
- Thread.sleep(3000);
+ /**
+ *
+ */
+ private static class ListenForOldestListener<UUID, Object> implements P2<UUID, Object> {
+ /** */
+ @LoggerResource
+ private transient IgniteLogger log;
- assertEquals(1, MSG_CNT.get());
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ MSG_CNT.incrementAndGet();
+
+ return true;
+ }
+ }
+
+
+ /**
+ *
+ */
+ private static class ListenWithIntTopic implements P2<UUID, Object> {
+ /** */
+ final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+
+ /** */
+ final AtomicBoolean error = new AtomicBoolean(false);
+
+ /** */
+ final CountDownLatch rcvLatch = new CountDownLatch(1);
+
+ /** */
+ private final Ignite sender;
+
+ /** */
+ private final Ignite receiver;
+
+ /** */
+ @IgniteInstanceResource
+ private transient Ignite g;
+
+ /** */
+ @LoggerResource
+ private transient IgniteLogger log;
+
+ /** */
+ final Integer topic;
+
+ /** */
+ final String message;
+
+ /**
+ * @param sender
+ * @param receiver
+ * @param topic
+ * @param message
+ */
+ public ListenWithIntTopic(Ignite sender, Ignite receiver, Integer topic, String message) {
+ this.sender = sender;
+ this.receiver = receiver;
+ this.topic = topic;
+ this.message = message;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ assertEquals(receiver, g);
+
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
+ ", topic=" + topic + ']');
+
+ if (!nodeId.equals(sender.cluster().localNode().id())) {
+ log.error("Unexpected sender node: " + nodeId);
+
+ error.set(true);
+
+ return false;
+ }
+
+ if (!message.equals(msg)) {
+ log.error("Unexpected message " + msg + " for topic: " + topic);
+
+ error.set(true);
+
+ return false;
+ }
+
+ rcvMsgs.add(msg);
+
+ return true;
+ }
+ finally {
+ rcvLatch.countDown();
+ }
+ }
}
+
+
+ /**
+ *
+ */
+ private static class MessageReceiverListener<UUID, Object> implements P2<UUID, Object> {
+ /** */
+ final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+
+ /** */
+ final CountDownLatch rcvLatch = new CountDownLatch(4);
+
+ /** */
+ @LoggerResource
+ private transient IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ try {
+ log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ rcvMsgs.add(msg);
+
+ return true;
+ }
+ finally {
+ rcvLatch.countDown();
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/490a2526/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index e25aaee..ac3e939 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1462,7 +1462,7 @@ public final class GridTestUtils {
double dur = (System.currentTimeMillis() - startTime) / 1000d;
System.out.printf("%s:\n operations:%d, duration=%fs, op/s=%d, latency=%fms\n", name, cnt, dur,
- (long)(cnt / dur), dur / cnt);
+ (long)(cnt / dur), dur / cnt);
}
/**
@@ -1495,4 +1495,18 @@ public final class GridTestUtils {
public static String apacheIgniteTestPath() {
return System.getProperty("IGNITE_TEST_PATH", U.getIgniteHome() + "/target/ignite");
}
+
+ /**
+ * Adds test to the suite only if it's not in {@code ignoredTests} set.
+ *
+ * @param suite TestSuite where to place the test.
+ * @param test Test.
+ * @param ignoredTests Tests to ignore.
+ */
+ public static void addTestIfNeeded(TestSuite suite, Class test, Set<Class> ignoredTests) {
+ if (ignoredTests != null && ignoredTests.contains(test))
+ return;
+
+ suite.addTestSuite(test);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/490a2526/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 939346c..1a642d4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -32,6 +32,8 @@ import org.apache.ignite.messaging.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.testframework.config.*;
+import java.util.*;
+
/**
* Basic test suite.
*/
@@ -41,16 +43,20 @@ public class IgniteBasicTestSuite extends TestSuite {
* @throws Exception Thrown in case of the failure.
*/
public static TestSuite suite() throws Exception {
+ return suite(null);
+ }
+
+ /**
+ * @param ignoredTests
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite(Set<Class> ignoredTests) throws Exception {
TestSuite suite = new TestSuite("Ignite Basic Test Suite");
suite.addTest(IgniteLangSelfTestSuite.suite());
suite.addTest(IgniteUtilSelfTestSuite.suite());
-
- Object marshClass = GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME);
-
- if (marshClass == null || marshClass.equals(OptimizedMarshaller.class.getName()) ||
- marshClass.equals(JdkMarshaller.class.getName()))
- suite.addTest(IgniteMarshallerSelfTestSuite.suite());
+ suite.addTest(IgniteMarshallerSelfTestSuite.suite(ignoredTests));
suite.addTest(IgniteKernalSelfTestSuite.suite());
suite.addTest(IgniteStartUpTestSuite.suite());
@@ -58,10 +64,10 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTest(IgniteP2PSelfTestSuite.suite());
suite.addTest(IgniteCacheP2pUnmarshallingErrorTestSuit.suite());
- suite.addTest(new TestSuite(GridSelfTest.class));
- suite.addTest(new TestSuite(GridProjectionSelfTest.class));
- suite.addTest(new TestSuite(GridMessagingSelfTest.class));
- suite.addTest(new TestSuite(GridMessagingNoPeerClassLoadingSelfTest.class));
+ suite.addTestSuite(GridSelfTest.class);
+ suite.addTestSuite(GridProjectionSelfTest.class);
+ suite.addTestSuite(GridMessagingSelfTest.class);
+ suite.addTestSuite(GridMessagingNoPeerClassLoadingSelfTest.class);
if (U.isLinux() || U.isMacOs())
suite.addTest(IgniteIpcSharedMemorySelfTestSuite.suite());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/490a2526/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
index 10afe10..40c32a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
@@ -21,6 +21,9 @@ import junit.framework.*;
import org.apache.ignite.internal.util.io.*;
import org.apache.ignite.marshaller.jdk.*;
import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
/**
* Test suite for all marshallers.
@@ -31,16 +34,25 @@ public class IgniteMarshallerSelfTestSuite extends TestSuite {
* @throws Exception If failed.
*/
public static TestSuite suite() throws Exception {
+ return suite(null);
+ }
+
+ /**
+ * @param ignoredTests
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite(Set<Class> ignoredTests) throws Exception {
TestSuite suite = new TestSuite("Ignite Marshaller Test Suite");
- suite.addTest(new TestSuite(GridJdkMarshallerSelfTest.class));
- suite.addTest(new TestSuite(OptimizedMarshallerEnumSelfTest.class));
- suite.addTest(new TestSuite(OptimizedMarshallerSelfTest.class));
- suite.addTest(new TestSuite(OptimizedMarshallerTest.class));
- suite.addTest(new TestSuite(OptimizedObjectStreamSelfTest.class));
- suite.addTest(new TestSuite(GridUnsafeDataOutputArraySizingSelfTest.class));
- suite.addTest(new TestSuite(OptimizedMarshallerNodeFailoverTest.class));
- suite.addTest(new TestSuite(OptimizedMarshallerSerialPersistentFieldsSelfTest.class));
+ GridTestUtils.addTestIfNeeded(suite, GridJdkMarshallerSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerEnumSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, OptimizedObjectStreamSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, GridUnsafeDataOutputArraySizingSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerNodeFailoverTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerSerialPersistentFieldsSelfTest.class, ignoredTests);
return suite;
}