You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/20 12:02:35 UTC

[18/19] incubator-ignite git commit: ignite-471-2: GridQueryProcessor fix and other test suites fixes

ignite-471-2: GridQueryProcessor fix and other test suites fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/490a2526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/490a2526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/490a2526

Branch: refs/heads/ignite-471-2
Commit: 490a2526906e7d22b1e20da2436f664393dc65f5
Parents: 379c73f
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed May 20 12:38:23 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed May 20 12:38:23 2015 +0300

----------------------------------------------------------------------
 .../ignite/messaging/GridMessagingSelfTest.java | 559 +++++++++----------
 .../ignite/testframework/GridTestUtils.java     |  16 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |  26 +-
 .../IgniteMarshallerSelfTestSuite.java          |  28 +-
 4 files changed, 317 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/490a2526/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index c033750..36c2a9f 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
@@ -182,8 +181,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        ((OptimizedMarshaller)cfg.getMarshaller()).setRequireSerializable(false);
-
         TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
 
         discoSpi.setIpFinder(ipFinder);
@@ -199,34 +196,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
      * @throws Exception If error occurs.
      */
     public void testSendReceiveMessage() throws Exception {
-        final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
-
-        final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
-
-        final CountDownLatch rcvLatch = new CountDownLatch(3);
-
-        ignite1.message().localListen(null, new P2<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                try {
-                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-                    if (!nodeId.equals(ignite2.cluster().localNode().id())) {
-                        log.error("Unexpected sender node: " + nodeId);
+        ReceiveRemoteMessageListener<UUID, Object> list = new ReceiveRemoteMessageListener<>(ignite2, 3);
 
-                        error.set(true);
-
-                        return false;
-                    }
-
-                    rcvMsgs.add(msg);
-
-                    return true;
-                }
-                finally {
-                    rcvLatch.countDown();
-                }
-            }
-        });
+        ignite1.message().localListen(null, list);
 
         ClusterGroup rNode1 = ignite2.cluster().forRemotes();
 
@@ -234,13 +206,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
         message(rNode1).send(null, MSG_2);
         message(rNode1).send(null, MSG_3);
 
-        assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
+        assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS));
 
-        assertFalse(error.get());
+        assertFalse(list.error.get());
 
-        assertTrue(rcvMsgs.contains(MSG_1));
-        assertTrue(rcvMsgs.contains(MSG_2));
-        assertTrue(rcvMsgs.contains(MSG_3));
+        assertTrue(list.rcvMsgs.contains(MSG_1));
+        assertTrue(list.rcvMsgs.contains(MSG_2));
+        assertTrue(list.rcvMsgs.contains(MSG_3));
     }
 
     /**
@@ -607,24 +579,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
      * @throws Exception If error occurs.
      */
     public void testRemoteListen() throws Exception {
-        final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
-
-        final CountDownLatch rcvLatch = new CountDownLatch(4);
+        MessageReceiverListener list = new MessageReceiverListener();
 
-        ignite2.message().remoteListen(null, new P2<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                try {
-                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-                    rcvMsgs.add(msg);
-
-                    return true;
-                }
-                finally {
-                    rcvLatch.countDown();
-                }
-            }
-        });
+        ignite2.message().remoteListen(null, list);
 
         ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
 
@@ -632,11 +589,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
         message(prj2).send(null, MSG_2);
         message(ignite2.cluster().forLocal()).send(null, MSG_3);
 
-        assertFalse(rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message.
+        assertFalse(list.rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message.
 
-        assertTrue(rcvMsgs.contains(MSG_1));
-        assertTrue(rcvMsgs.contains(MSG_2));
-        assertTrue(rcvMsgs.contains(MSG_3));
+        assertTrue(list.rcvMsgs.contains(MSG_1));
+        assertTrue(list.rcvMsgs.contains(MSG_2));
+        assertTrue(list.rcvMsgs.contains(MSG_3));
     }
 
     /**
@@ -644,45 +601,19 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
      */
     @SuppressWarnings("TooBroadScope")
     public void testStopRemoteListen() throws Exception {
-        final AtomicInteger msgCnt1 = new AtomicInteger();
-
-        final AtomicInteger msgCnt2 = new AtomicInteger();
-
-        final AtomicInteger msgCnt3 = new AtomicInteger();
+        final IncrementTestListener list1 = new IncrementTestListener();
+        final IncrementTestListener list2 = new IncrementTestListener();
+        final IncrementTestListener list3 = new IncrementTestListener();
 
         final String topic1 = null;
         final String topic2 = "top2";
         final String topic3 = "top3";
 
-        UUID id1 = ignite2.message().remoteListen(topic1, new P2<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                System.out.println(Thread.currentThread().getName() + " Listener1 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-                msgCnt1.incrementAndGet();
-
-                return true;
-            }
-        });
-
-        UUID id2 = ignite2.message().remoteListen(topic2, new P2<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                System.out.println(Thread.currentThread().getName() + " Listener2 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-                msgCnt2.incrementAndGet();
-
-                return true;
-            }
-        });
-
-        UUID id3 = ignite2.message().remoteListen(topic3, new P2<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                System.out.println(Thread.currentThread().getName() + " Listener3 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+        UUID id1 = ignite2.message().remoteListen(topic1, list1);
 
-                msgCnt3.incrementAndGet();
+        UUID id2 = ignite2.message().remoteListen(topic2, list2);
 
-                return true;
-            }
-        });
+        UUID id3 = ignite2.message().remoteListen(topic3, list3);
 
         message(ignite1.cluster().forRemotes()).send(topic1, "msg1-1");
         message(ignite1.cluster().forRemotes()).send(topic2, "msg1-2");
@@ -690,13 +621,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
 
         GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
-                return msgCnt1.get() > 0 && msgCnt2.get() > 0 && msgCnt3.get() > 0;
+                return list1.msgCnt.get() > 0 && list2.msgCnt.get() > 0 && list3.msgCnt.get() > 0;
             }
         }, 5000);
 
-        assertEquals(1, msgCnt1.get());
-        assertEquals(1, msgCnt2.get());
-        assertEquals(1, msgCnt3.get());
+        assertEquals(1, list1.msgCnt.get());
+        assertEquals(1, list2.msgCnt.get());
+        assertEquals(1, list3.msgCnt.get());
 
         ignite2.message().stopRemoteListen(id2);
 
@@ -706,13 +637,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
 
         GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
-                return msgCnt1.get() > 1 && msgCnt3.get() > 1;
+                return list1.msgCnt.get() > 1 && list3.msgCnt.get() > 1;
             }
         }, 5000);
 
-        assertEquals(2, msgCnt1.get());
-        assertEquals(1, msgCnt2.get());
-        assertEquals(2, msgCnt3.get());
+        assertEquals(2, list1.msgCnt.get());
+        assertEquals(1, list2.msgCnt.get());
+        assertEquals(2, list3.msgCnt.get());
 
         ignite2.message().stopRemoteListen(id2); // Try remove one more time.
 
@@ -725,9 +656,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
 
         U.sleep(1000);
 
-        assertEquals(2, msgCnt1.get());
-        assertEquals(1, msgCnt2.get());
-        assertEquals(2, msgCnt3.get());
+        assertEquals(2, list1.msgCnt.get());
+        assertEquals(1, list2.msgCnt.get());
+        assertEquals(2, list3.msgCnt.get());
     }
 
     /**
@@ -742,46 +673,21 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
             new TestMessage(MSG_2, 3000),
             new TestMessage(MSG_3));
 
-        final Collection<Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+        ReceiveRemoteMessageListener receiver = new ReceiveRemoteMessageListener<>(ignite1, 3);
 
-        final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
-
-        final CountDownLatch rcvLatch = new CountDownLatch(3);
-
-        ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                try {
-                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
-                        log.error("Unexpected sender node: " + nodeId);
-
-                        error.set(true);
-
-                        return false;
-                    }
-
-                    rcvMsgs.add(msg);
-
-                    return true;
-                }
-                finally {
-                    rcvLatch.countDown();
-                }
-            }
-        });
+        ignite2.message().remoteListen(S_TOPIC_1, receiver);
 
         ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
 
         for (TestMessage msg : msgs)
             message(prj2).sendOrdered(S_TOPIC_1, msg, 15000);
 
-        assertTrue(rcvLatch.await(6, TimeUnit.SECONDS));
+        assertTrue(receiver.rcvLatch.await(6, TimeUnit.SECONDS));
 
-        assertFalse(error.get());
+        assertFalse(receiver.error.get());
 
         //noinspection AssertEqualsBetweenInconvertibleTypes
-        assertEquals(msgs, Arrays.asList(rcvMsgs.toArray()));
+        assertEquals(msgs, Arrays.asList(receiver.rcvMsgs.toArray()));
     }
 
     /**
@@ -791,122 +697,17 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
      * @throws Exception If error occurs.
      */
     public void testRemoteListenWithIntTopic() throws Exception {
-        final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
-
-        final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
-
-        final CountDownLatch rcvLatch = new CountDownLatch(3);
-
-        ignite2.message().remoteListen(I_TOPIC_1, new P2<UUID, Object>() {
-            @IgniteInstanceResource
-            private transient Ignite g;
-
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                assertEquals(ignite2, g);
-
-                try {
-                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
-                        ", topic=" + I_TOPIC_1 + ']');
-
-                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
-                        log.error("Unexpected sender node: " + nodeId);
-
-                        error.set(true);
-
-                        return false;
-                    }
-
-                    if (!MSG_1.equals(msg)) {
-                        log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_1);
-
-                        error.set(true);
-
-                        return false;
-                    }
-
-                    rcvMsgs.add(msg);
-
-                    return true;
-                }
-                finally {
-                    rcvLatch.countDown();
-                }
-            }
-        });
-
-        ignite2.message().remoteListen(I_TOPIC_2, new P2<UUID, Object>() {
-            @IgniteInstanceResource
-            private transient Ignite g;
-
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                assertEquals(ignite2, g);
-
-                try {
-                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
-                        ", topic=" + I_TOPIC_2 + ']');
-
-                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
-                        log.error("Unexpected sender node: " + nodeId);
-
-                        error.set(true);
-
-                        return false;
-                    }
-
-                    if (!MSG_2.equals(msg)) {
-                        log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_2);
-
-                        error.set(true);
-
-                        return false;
-                    }
-
-                    rcvMsgs.add(msg);
-
-                    return true;
-                }
-                finally {
-                    rcvLatch.countDown();
-                }
-            }
-        });
-
-        ignite2.message().remoteListen(null, new P2<UUID, Object>() {
-            @IgniteInstanceResource
-            private transient Ignite g;
-
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                assertEquals(ignite2, g);
-
-                try {
-                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
-                        ", topic=default]");
+        ListenWithIntTopic topList1 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_1, MSG_1);
 
-                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
-                        log.error("Unexpected sender node: " + nodeId);
-
-                        error.set(true);
-
-                        return false;
-                    }
-
-                    if (!MSG_3.equals(msg)) {
-                        log.error("Unexpected message " + msg + " for topic: default");
+        ListenWithIntTopic topList2 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_2, MSG_2);
 
-                        error.set(true);
+        ListenWithIntTopic topList3 = new ListenWithIntTopic(ignite1, ignite2, null, MSG_3);
 
-                        return false;
-                    }
+        ignite2.message().remoteListen(I_TOPIC_1, topList1);
 
-                    rcvMsgs.add(msg);
+        ignite2.message().remoteListen(I_TOPIC_2, topList2);
 
-                    return true;
-                }
-                finally {
-                    rcvLatch.countDown();
-                }
-            }
-        });
+        ignite2.message().remoteListen(null, topList3);
 
         ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
 
@@ -914,13 +715,17 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
         message(prj2).send(I_TOPIC_2, MSG_2);
         message(prj2).send(null, MSG_3);
 
-        assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
+        assertTrue(topList1.rcvLatch.await(3, TimeUnit.SECONDS));
+        assertTrue(topList2.rcvLatch.await(3, TimeUnit.SECONDS));
+        assertTrue(topList3.rcvLatch.await(3, TimeUnit.SECONDS));
 
-        assertFalse(error.get());
+        assertFalse(topList1.error.get());
+        assertFalse(topList2.error.get());
+        assertFalse(topList3.error.get());
 
-        assertTrue(rcvMsgs.contains(MSG_1));
-        assertTrue(rcvMsgs.contains(MSG_2));
-        assertTrue(rcvMsgs.contains(MSG_3));
+        assertTrue(topList1.rcvMsgs.contains(MSG_1));
+        assertTrue(topList2.rcvMsgs.contains(MSG_2));
+        assertTrue(topList3.rcvMsgs.contains(MSG_3));
     }
 
     /**
@@ -936,36 +741,15 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
 
         Class rcCls = extLdr.loadClass(EXT_RESOURCE_CLS_NAME);
 
-        final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
-
-        final CountDownLatch rcvLatch = new CountDownLatch(1);
-
-        ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                try {
-                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
-                        log.error("Unexpected sender node: " + nodeId);
+        ReceiveRemoteMessageListener list = new ReceiveRemoteMessageListener(ignite1, 1);
 
-                        error.set(true);
-
-                        return false;
-                    }
-
-                    return true;
-                }
-                finally {
-                    rcvLatch.countDown();
-                }
-            }
-        });
+        ignite2.message().remoteListen(S_TOPIC_1, list);
 
         message(ignite1.cluster().forRemotes()).send(S_TOPIC_1, Collections.singleton(rcCls.newInstance()));
 
-        assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
+        assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS));
 
-        assertFalse(error.get());
+        assertFalse(list.error.get());
     }
 
     /**
@@ -1012,8 +796,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testAsync() throws Exception {
-        final AtomicInteger msgCnt = new AtomicInteger();
-
         assertFalse(ignite2.message().isAsync());
 
         final IgniteMessaging msg = ignite2.message().withAsync();
@@ -1022,6 +804,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
 
         assertFalse(ignite2.message().isAsync());
 
+        final IncrementTestListener list = new IncrementTestListener();
+
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
                 msg.future();
@@ -1032,16 +816,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
 
         final String topic = "topic";
 
-        UUID id = msg.remoteListen(topic, new P2<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                System.out.println(Thread.currentThread().getName() +
-                    " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-                msgCnt.incrementAndGet();
-
-                return true;
-            }
-        });
+        UUID id = msg.remoteListen(topic, list);
 
         Assert.assertNull(id);
 
@@ -1065,11 +840,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
 
         GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
-                return msgCnt.get() > 0;
+                return list.msgCnt.get() > 0;
             }
         }, 5000);
 
-        assertEquals(1, msgCnt.get());
+        assertEquals(1, list.msgCnt.get());
 
         msg.stopRemoteListen(id);
 
@@ -1091,7 +866,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
 
         U.sleep(1000);
 
-        assertEquals(1, msgCnt.get());
+        assertEquals(1, list.msgCnt.get());
     }
 
     /**
@@ -1122,20 +897,218 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest {
         assertEquals(1, grp.nodes().size());
         assertEquals(expOldestIgnite.cluster().localNode().id(), grp.node().id());
 
-        ignite1.message(grp).remoteListen(null, new P2<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                System.out.println("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+        ignite1.message(grp).remoteListen(null, new ListenForOldestListener<UUID, Object>());
+
+        ignite1.message().send(null, MSG_1);
+
+        Thread.sleep(3000);
+
+        assertEquals(1, MSG_CNT.get());
+    }
+
+    /**
+     *
+     */
+    private static class IncrementTestListener<UUID, Object> implements P2<UUID, Object> {
+        /** */
+        final AtomicInteger msgCnt = new AtomicInteger();
+
+        /** */
+        @LoggerResource
+        private transient IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID nodeId, Object msg) {
+            log.info(Thread.currentThread().getName() +
+                         " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
 
-                MSG_CNT.incrementAndGet();
+            msgCnt.incrementAndGet();
+
+            return true;
+        }
+    }
+
+
+    /**
+     *
+     */
+    private static class ReceiveRemoteMessageListener<UUID, Object> implements P2<UUID, Object> {
+        /** */
+        final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+
+        /** */
+        final AtomicBoolean error = new AtomicBoolean(false);
+
+        /** */
+        final CountDownLatch rcvLatch;
+
+        /** */
+        final Ignite sender;
+
+        /** */
+        @LoggerResource
+        private transient IgniteLogger  log;
+
+        /**
+         * @param sender
+         * @param latchCount
+         */
+        public ReceiveRemoteMessageListener(Ignite sender, int latchCount) {
+            this.sender = sender;
+            rcvLatch = new CountDownLatch(latchCount);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID nodeId, Object msg) {
+            try {
+                log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+                if (!nodeId.equals(sender.cluster().localNode().id())) {
+                    log.info("Unexpected sender node: " + nodeId);
+
+                    error.set(true);
+
+                    return false;
+                }
+
+                rcvMsgs.add(msg);
 
                 return true;
             }
-        });
+            finally {
+                rcvLatch.countDown();
+            }
+        }
+    }
 
-        ignite1.message().send(null, MSG_1);
 
-        Thread.sleep(3000);
+    /**
+     *
+     */
+    private static class ListenForOldestListener<UUID, Object> implements P2<UUID, Object> {
+        /** */
+        @LoggerResource
+        private transient IgniteLogger log;
 
-        assertEquals(1, MSG_CNT.get());
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID nodeId, Object msg) {
+            log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+            MSG_CNT.incrementAndGet();
+
+            return true;
+        }
+    }
+
+
+    /**
+     *
+     */
+    private static class ListenWithIntTopic implements P2<UUID, Object> {
+        /** */
+        final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+
+        /** */
+        final AtomicBoolean error = new AtomicBoolean(false);
+
+        /** */
+        final CountDownLatch rcvLatch = new CountDownLatch(1);
+
+        /** */
+        private final Ignite sender;
+
+        /** */
+        private final Ignite receiver;
+
+        /** */
+        @IgniteInstanceResource
+        private transient Ignite g;
+
+        /** */
+        @LoggerResource
+        private transient IgniteLogger log;
+
+        /** */
+        final Integer topic;
+
+        /** */
+        final String message;
+
+        /**
+         * @param sender
+         * @param receiver
+         * @param topic
+         * @param message
+         */
+        public ListenWithIntTopic(Ignite sender, Ignite receiver, Integer topic, String message) {
+            this.sender = sender;
+            this.receiver = receiver;
+            this.topic = topic;
+            this.message = message;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID nodeId, Object msg) {
+            assertEquals(receiver, g);
+
+            try {
+                log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
+                             ", topic=" + topic + ']');
+
+                if (!nodeId.equals(sender.cluster().localNode().id())) {
+                    log.error("Unexpected sender node: " + nodeId);
+
+                    error.set(true);
+
+                    return false;
+                }
+
+                if (!message.equals(msg)) {
+                    log.error("Unexpected message " + msg + " for topic: " + topic);
+
+                    error.set(true);
+
+                    return false;
+                }
+
+                rcvMsgs.add(msg);
+
+                return true;
+            }
+            finally {
+                rcvLatch.countDown();
+            }
+        }
     }
+
+
+    /**
+     *
+     */
+    private static class MessageReceiverListener<UUID, Object> implements P2<UUID, Object> {
+        /** */
+        final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+
+        /** */
+        final CountDownLatch rcvLatch = new CountDownLatch(4);
+
+        /** */
+        @LoggerResource
+        private transient IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID nodeId, Object msg) {
+            try {
+                log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+                rcvMsgs.add(msg);
+
+                return true;
+            }
+            finally {
+                rcvLatch.countDown();
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/490a2526/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index e25aaee..ac3e939 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1462,7 +1462,7 @@ public final class GridTestUtils {
         double dur = (System.currentTimeMillis() - startTime) / 1000d;
 
         System.out.printf("%s:\n operations:%d, duration=%fs, op/s=%d, latency=%fms\n", name, cnt, dur,
-            (long)(cnt / dur), dur / cnt);
+                          (long)(cnt / dur), dur / cnt);
     }
 
     /**
@@ -1495,4 +1495,18 @@ public final class GridTestUtils {
     public static String apacheIgniteTestPath() {
         return System.getProperty("IGNITE_TEST_PATH", U.getIgniteHome() + "/target/ignite");
     }
+
+    /**
+     * Adds test to the suite only if it's not in {@code ignoredTests} set.
+     *
+     * @param suite TestSuite where to place the test.
+     * @param test Test.
+     * @param ignoredTests Tests to ignore.
+     */
+    public static void addTestIfNeeded(TestSuite suite, Class test, Set<Class> ignoredTests) {
+        if (ignoredTests != null && ignoredTests.contains(test))
+            return;
+
+        suite.addTestSuite(test);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/490a2526/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 939346c..1a642d4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -32,6 +32,8 @@ import org.apache.ignite.messaging.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.testframework.config.*;
 
+import java.util.*;
+
 /**
  * Basic test suite.
  */
@@ -41,16 +43,20 @@ public class IgniteBasicTestSuite extends TestSuite {
      * @throws Exception Thrown in case of the failure.
      */
     public static TestSuite suite() throws Exception {
+        return suite(null);
+    }
+
+    /**
+     * @param ignoredTests
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite(Set<Class> ignoredTests) throws Exception {
         TestSuite suite = new TestSuite("Ignite Basic Test Suite");
 
         suite.addTest(IgniteLangSelfTestSuite.suite());
         suite.addTest(IgniteUtilSelfTestSuite.suite());
-
-        Object marshClass = GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME);
-
-        if (marshClass == null || marshClass.equals(OptimizedMarshaller.class.getName()) ||
-            marshClass.equals(JdkMarshaller.class.getName()))
-            suite.addTest(IgniteMarshallerSelfTestSuite.suite());
+        suite.addTest(IgniteMarshallerSelfTestSuite.suite(ignoredTests));
 
         suite.addTest(IgniteKernalSelfTestSuite.suite());
         suite.addTest(IgniteStartUpTestSuite.suite());
@@ -58,10 +64,10 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTest(IgniteP2PSelfTestSuite.suite());
         suite.addTest(IgniteCacheP2pUnmarshallingErrorTestSuit.suite());
 
-        suite.addTest(new TestSuite(GridSelfTest.class));
-        suite.addTest(new TestSuite(GridProjectionSelfTest.class));
-        suite.addTest(new TestSuite(GridMessagingSelfTest.class));
-        suite.addTest(new TestSuite(GridMessagingNoPeerClassLoadingSelfTest.class));
+        suite.addTestSuite(GridSelfTest.class);
+        suite.addTestSuite(GridProjectionSelfTest.class);
+        suite.addTestSuite(GridMessagingSelfTest.class);
+        suite.addTestSuite(GridMessagingNoPeerClassLoadingSelfTest.class);
 
         if (U.isLinux() || U.isMacOs())
             suite.addTest(IgniteIpcSharedMemorySelfTestSuite.suite());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/490a2526/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
index 10afe10..40c32a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
@@ -21,6 +21,9 @@ import junit.framework.*;
 import org.apache.ignite.internal.util.io.*;
 import org.apache.ignite.marshaller.jdk.*;
 import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
 
 /**
  * Test suite for all marshallers.
@@ -31,16 +34,25 @@ public class IgniteMarshallerSelfTestSuite extends TestSuite {
      * @throws Exception If failed.
      */
     public static TestSuite suite() throws Exception {
+        return suite(null);
+    }
+
+    /**
+     * @param ignoredTests
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite(Set<Class> ignoredTests) throws Exception {
         TestSuite suite = new TestSuite("Ignite Marshaller Test Suite");
 
-        suite.addTest(new TestSuite(GridJdkMarshallerSelfTest.class));
-        suite.addTest(new TestSuite(OptimizedMarshallerEnumSelfTest.class));
-        suite.addTest(new TestSuite(OptimizedMarshallerSelfTest.class));
-        suite.addTest(new TestSuite(OptimizedMarshallerTest.class));
-        suite.addTest(new TestSuite(OptimizedObjectStreamSelfTest.class));
-        suite.addTest(new TestSuite(GridUnsafeDataOutputArraySizingSelfTest.class));
-        suite.addTest(new TestSuite(OptimizedMarshallerNodeFailoverTest.class));
-        suite.addTest(new TestSuite(OptimizedMarshallerSerialPersistentFieldsSelfTest.class));
+        GridTestUtils.addTestIfNeeded(suite, GridJdkMarshallerSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerEnumSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, OptimizedObjectStreamSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, GridUnsafeDataOutputArraySizingSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerNodeFailoverTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerSerialPersistentFieldsSelfTest.class, ignoredTests);
 
         return suite;
     }