You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 08:27:57 UTC

incubator-ignite git commit: ignite-471-2: disable a test for portable marshaller

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-471-2 4007a4303 -> d64185b82


ignite-471-2: disable a test for portable marshaller


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d64185b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d64185b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d64185b8

Branch: refs/heads/ignite-471-2
Commit: d64185b82640d994b263a9558909a7b86546743c
Parents: 4007a43
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jun 11 09:27:46 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jun 11 09:27:46 2015 +0300

----------------------------------------------------------------------
 .../ignite/messaging/GridMessagingSelfTest.java | 557 ++++++++++---------
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 +-
 2 files changed, 295 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d64185b8/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index fb4ac8c..455bc8c 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -75,6 +75,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
     /** Shared IP finder. */
     private final transient TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
+    /** */
+    protected static CountDownLatch rcvLatch;
+
     /**
      * A test message topic.
      */
@@ -180,7 +183,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
-        
+
         TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
 
         discoSpi.setIpFinder(ipFinder);
@@ -196,9 +199,34 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
      * @throws Exception If error occurs.
      */
     public void testSendReceiveMessage() throws Exception {
-        ReceiveRemoteMessageListener<UUID, Object> list = new ReceiveRemoteMessageListener<>(ignite2, 3);
+        final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
+
+        final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
+
+        final CountDownLatch rcvLatch = new CountDownLatch(3);
+
+        ignite1.message().localListen(null, new P2<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                try {
+                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
 
-        ignite1.message().localListen(null, list);
+                    if (!nodeId.equals(ignite2.cluster().localNode().id())) {
+                        log.error("Unexpected sender node: " + nodeId);
+
+                        error.set(true);
+
+                        return false;
+                    }
+
+                    rcvMsgs.add(msg);
+
+                    return true;
+                }
+                finally {
+                    rcvLatch.countDown();
+                }
+            }
+        });
 
         ClusterGroup rNode1 = ignite2.cluster().forRemotes();
 
@@ -206,13 +234,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
         message(rNode1).send(null, MSG_2);
         message(rNode1).send(null, MSG_3);
 
-        assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS));
+        assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
 
-        assertFalse(list.error.get());
+        assertFalse(error.get());
 
-        assertTrue(list.rcvMsgs.contains(MSG_1));
-        assertTrue(list.rcvMsgs.contains(MSG_2));
-        assertTrue(list.rcvMsgs.contains(MSG_3));
+        assertTrue(rcvMsgs.contains(MSG_1));
+        assertTrue(rcvMsgs.contains(MSG_2));
+        assertTrue(rcvMsgs.contains(MSG_3));
     }
 
     /**
@@ -579,9 +607,24 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
      * @throws Exception If error occurs.
      */
     public void testRemoteListen() throws Exception {
-        MessageReceiverListener list = new MessageReceiverListener();
+        final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
 
-        ignite2.message().remoteListen(null, list);
+        rcvLatch = new CountDownLatch(4);
+
+        ignite2.message().remoteListen(null, new P2<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                try {
+                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+                    rcvMsgs.add(msg);
+
+                    return true;
+                }
+                finally {
+                    rcvLatch.countDown();
+                }
+            }
+        });
 
         ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
 
@@ -589,11 +632,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
         message(prj2).send(null, MSG_2);
         message(ignite2.cluster().forLocal()).send(null, MSG_3);
 
-        assertFalse(list.rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message.
+        assertFalse(rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message.
 
-        assertTrue(list.rcvMsgs.contains(MSG_1));
-        assertTrue(list.rcvMsgs.contains(MSG_2));
-        assertTrue(list.rcvMsgs.contains(MSG_3));
+        assertTrue(rcvMsgs.contains(MSG_1));
+        assertTrue(rcvMsgs.contains(MSG_2));
+        assertTrue(rcvMsgs.contains(MSG_3));
     }
 
     /**
@@ -601,19 +644,45 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
      */
     @SuppressWarnings("TooBroadScope")
     public void testStopRemoteListen() throws Exception {
-        final IncrementTestListener list1 = new IncrementTestListener();
-        final IncrementTestListener list2 = new IncrementTestListener();
-        final IncrementTestListener list3 = new IncrementTestListener();
+        final AtomicInteger msgCnt1 = new AtomicInteger();
+
+        final AtomicInteger msgCnt2 = new AtomicInteger();
+
+        final AtomicInteger msgCnt3 = new AtomicInteger();
 
         final String topic1 = null;
         final String topic2 = "top2";
         final String topic3 = "top3";
 
-        UUID id1 = ignite2.message().remoteListen(topic1, list1);
+        UUID id1 = ignite2.message().remoteListen(topic1, new P2<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                System.out.println(Thread.currentThread().getName() + " Listener1 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
 
-        UUID id2 = ignite2.message().remoteListen(topic2, list2);
+                msgCnt1.incrementAndGet();
 
-        UUID id3 = ignite2.message().remoteListen(topic3, list3);
+                return true;
+            }
+        });
+
+        UUID id2 = ignite2.message().remoteListen(topic2, new P2<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                System.out.println(Thread.currentThread().getName() + " Listener2 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+                msgCnt2.incrementAndGet();
+
+                return true;
+            }
+        });
+
+        UUID id3 = ignite2.message().remoteListen(topic3, new P2<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                System.out.println(Thread.currentThread().getName() + " Listener3 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+                msgCnt3.incrementAndGet();
+
+                return true;
+            }
+        });
 
         message(ignite1.cluster().forRemotes()).send(topic1, "msg1-1");
         message(ignite1.cluster().forRemotes()).send(topic2, "msg1-2");
@@ -621,13 +690,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
-                return list1.msgCnt.get() > 0 && list2.msgCnt.get() > 0 && list3.msgCnt.get() > 0;
+                return msgCnt1.get() > 0 && msgCnt2.get() > 0 && msgCnt3.get() > 0;
             }
         }, 5000);
 
-        assertEquals(1, list1.msgCnt.get());
-        assertEquals(1, list2.msgCnt.get());
-        assertEquals(1, list3.msgCnt.get());
+        assertEquals(1, msgCnt1.get());
+        assertEquals(1, msgCnt2.get());
+        assertEquals(1, msgCnt3.get());
 
         ignite2.message().stopRemoteListen(id2);
 
@@ -637,13 +706,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
-                return list1.msgCnt.get() > 1 && list3.msgCnt.get() > 1;
+                return msgCnt1.get() > 1 && msgCnt3.get() > 1;
             }
         }, 5000);
 
-        assertEquals(2, list1.msgCnt.get());
-        assertEquals(1, list2.msgCnt.get());
-        assertEquals(2, list3.msgCnt.get());
+        assertEquals(2, msgCnt1.get());
+        assertEquals(1, msgCnt2.get());
+        assertEquals(2, msgCnt3.get());
 
         ignite2.message().stopRemoteListen(id2); // Try remove one more time.
 
@@ -656,9 +725,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         U.sleep(1000);
 
-        assertEquals(2, list1.msgCnt.get());
-        assertEquals(1, list2.msgCnt.get());
-        assertEquals(2, list3.msgCnt.get());
+        assertEquals(2, msgCnt1.get());
+        assertEquals(1, msgCnt2.get());
+        assertEquals(2, msgCnt3.get());
     }
 
     /**
@@ -673,21 +742,46 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
             new TestMessage(MSG_2, 3000),
             new TestMessage(MSG_3));
 
-        ReceiveRemoteMessageListener receiver = new ReceiveRemoteMessageListener<>(ignite1, 3);
+        final Collection<Object> rcvMsgs = new ConcurrentLinkedDeque<>();
+
+        final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
+
+        rcvLatch = new CountDownLatch(3);
+
+        ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                try {
+                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+                        log.error("Unexpected sender node: " + nodeId);
+
+                        error.set(true);
+
+                        return false;
+                    }
 
-        ignite2.message().remoteListen(S_TOPIC_1, receiver);
+                    rcvMsgs.add(msg);
+
+                    return true;
+                }
+                finally {
+                    rcvLatch.countDown();
+                }
+            }
+        });
 
         ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
 
         for (TestMessage msg : msgs)
             message(prj2).sendOrdered(S_TOPIC_1, msg, 15000);
 
-        assertTrue(receiver.rcvLatch.await(6, TimeUnit.SECONDS));
+        assertTrue(rcvLatch.await(6, TimeUnit.SECONDS));
 
-        assertFalse(receiver.error.get());
+        assertFalse(error.get());
 
         //noinspection AssertEqualsBetweenInconvertibleTypes
-        assertEquals(msgs, Arrays.asList(receiver.rcvMsgs.toArray()));
+        assertEquals(msgs, Arrays.asList(rcvMsgs.toArray()));
     }
 
     /**
@@ -697,17 +791,122 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
      * @throws Exception If error occurs.
      */
     public void testRemoteListenWithIntTopic() throws Exception {
-        ListenWithIntTopic topList1 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_1, MSG_1);
+        final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>();
+
+        final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
+
+        rcvLatch = new CountDownLatch(3);
+
+        ignite2.message().remoteListen(I_TOPIC_1, new P2<UUID, Object>() {
+            @IgniteInstanceResource
+            private transient Ignite g;
+
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                assertEquals(ignite2, g);
+
+                try {
+                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
+                        ", topic=" + I_TOPIC_1 + ']');
+
+                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+                        log.error("Unexpected sender node: " + nodeId);
+
+                        error.set(true);
+
+                        return false;
+                    }
+
+                    if (!MSG_1.equals(msg)) {
+                        log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_1);
+
+                        error.set(true);
+
+                        return false;
+                    }
+
+                    rcvMsgs.add(msg);
+
+                    return true;
+                }
+                finally {
+                    rcvLatch.countDown();
+                }
+            }
+        });
+
+        ignite2.message().remoteListen(I_TOPIC_2, new P2<UUID, Object>() {
+            @IgniteInstanceResource
+            private transient Ignite g;
+
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                assertEquals(ignite2, g);
+
+                try {
+                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
+                        ", topic=" + I_TOPIC_2 + ']');
+
+                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+                        log.error("Unexpected sender node: " + nodeId);
+
+                        error.set(true);
+
+                        return false;
+                    }
+
+                    if (!MSG_2.equals(msg)) {
+                        log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_2);
+
+                        error.set(true);
+
+                        return false;
+                    }
+
+                    rcvMsgs.add(msg);
+
+                    return true;
+                }
+                finally {
+                    rcvLatch.countDown();
+                }
+            }
+        });
+
+        ignite2.message().remoteListen(null, new P2<UUID, Object>() {
+            @IgniteInstanceResource
+            private transient Ignite g;
+
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                assertEquals(ignite2, g);
+
+                try {
+                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
+                        ", topic=default]");
+
+                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+                        log.error("Unexpected sender node: " + nodeId);
 
-        ListenWithIntTopic topList2 = new ListenWithIntTopic(ignite1, ignite2, I_TOPIC_2, MSG_2);
+                        error.set(true);
 
-        ListenWithIntTopic topList3 = new ListenWithIntTopic(ignite1, ignite2, null, MSG_3);
+                        return false;
+                    }
 
-        ignite2.message().remoteListen(I_TOPIC_1, topList1);
+                    if (!MSG_3.equals(msg)) {
+                        log.error("Unexpected message " + msg + " for topic: default");
 
-        ignite2.message().remoteListen(I_TOPIC_2, topList2);
+                        error.set(true);
 
-        ignite2.message().remoteListen(null, topList3);
+                        return false;
+                    }
+
+                    rcvMsgs.add(msg);
+
+                    return true;
+                }
+                finally {
+                    rcvLatch.countDown();
+                }
+            }
+        });
 
         ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2.
 
@@ -715,17 +914,13 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
         message(prj2).send(I_TOPIC_2, MSG_2);
         message(prj2).send(null, MSG_3);
 
-        assertTrue(topList1.rcvLatch.await(3, TimeUnit.SECONDS));
-        assertTrue(topList2.rcvLatch.await(3, TimeUnit.SECONDS));
-        assertTrue(topList3.rcvLatch.await(3, TimeUnit.SECONDS));
+        assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
 
-        assertFalse(topList1.error.get());
-        assertFalse(topList2.error.get());
-        assertFalse(topList3.error.get());
+        assertFalse(error.get());
 
-        assertTrue(topList1.rcvMsgs.contains(MSG_1));
-        assertTrue(topList2.rcvMsgs.contains(MSG_2));
-        assertTrue(topList3.rcvMsgs.contains(MSG_3));
+        assertTrue(rcvMsgs.contains(MSG_1));
+        assertTrue(rcvMsgs.contains(MSG_2));
+        assertTrue(rcvMsgs.contains(MSG_3));
     }
 
     /**
@@ -741,15 +936,36 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         Class rcCls = extLdr.loadClass(EXT_RESOURCE_CLS_NAME);
 
-        ReceiveRemoteMessageListener list = new ReceiveRemoteMessageListener(ignite1, 1);
+        final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable
+
+        final CountDownLatch rcvLatch = new CountDownLatch(1);
 
-        ignite2.message().remoteListen(S_TOPIC_1, list);
+        ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                try {
+                    log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+                    if (!nodeId.equals(ignite1.cluster().localNode().id())) {
+                        log.error("Unexpected sender node: " + nodeId);
+
+                        error.set(true);
+
+                        return false;
+                    }
+
+                    return true;
+                }
+                finally {
+                    rcvLatch.countDown();
+                }
+            }
+        });
 
         message(ignite1.cluster().forRemotes()).send(S_TOPIC_1, Collections.singleton(rcCls.newInstance()));
 
-        assertTrue(list.rcvLatch.await(3, TimeUnit.SECONDS));
+        assertTrue(rcvLatch.await(3, TimeUnit.SECONDS));
 
-        assertFalse(list.error.get());
+        assertFalse(error.get());
     }
 
     /**
@@ -796,6 +1012,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
      * @throws Exception If failed.
      */
     public void testAsync() throws Exception {
+        final AtomicInteger msgCnt = new AtomicInteger();
+
         assertFalse(ignite2.message().isAsync());
 
         final IgniteMessaging msg = ignite2.message().withAsync();
@@ -804,8 +1022,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         assertFalse(ignite2.message().isAsync());
 
-        final IncrementTestListener list = new IncrementTestListener();
-
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
                 msg.future();
@@ -816,7 +1032,16 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         final String topic = "topic";
 
-        UUID id = msg.remoteListen(topic, list);
+        UUID id = msg.remoteListen(topic, new P2<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                System.out.println(Thread.currentThread().getName() +
+                    " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+                msgCnt.incrementAndGet();
+
+                return true;
+            }
+        });
 
         Assert.assertNull(id);
 
@@ -840,11 +1065,11 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
-                return list.msgCnt.get() > 0;
+                return msgCnt.get() > 0;
             }
         }, 5000);
 
-        assertEquals(1, list.msgCnt.get());
+        assertEquals(1, msgCnt.get());
 
         msg.stopRemoteListen(id);
 
@@ -866,7 +1091,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
         U.sleep(1000);
 
-        assertEquals(1, list.msgCnt.get());
+        assertEquals(1, msgCnt.get());
     }
 
     /**
@@ -890,7 +1115,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
 
     /**
      * @param expOldestIgnite Expected oldest ignite.
-     * @throws InterruptedException If interrupted.
      */
     private void remoteListenForOldest(Ignite expOldestIgnite) throws InterruptedException {
         ClusterGroup grp = ignite1.cluster().forOldest();
@@ -898,213 +1122,20 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
         assertEquals(1, grp.nodes().size());
         assertEquals(expOldestIgnite.cluster().localNode().id(), grp.node().id());
 
-        ignite1.message(grp).remoteListen(null, new ListenForOldestListener<UUID, Object>());
-
-        ignite1.message().send(null, MSG_1);
-
-        Thread.sleep(3000);
-
-        assertEquals(1, MSG_CNT.get());
-    }
-
-    /**
-     *
-     */
-    private static class IncrementTestListener<UUID, Object> implements P2<UUID, Object> {
-        /** */
-        final AtomicInteger msgCnt = new AtomicInteger();
-
-        /** */
-        @LoggerResource
-        private transient IgniteLogger log;
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(UUID nodeId, Object msg) {
-            log.info("Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-            msgCnt.incrementAndGet();
-
-            return true;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class ReceiveRemoteMessageListener<UUID, Object> implements P2<UUID, Object> {
-        /** */
-        final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
-
-        /** */
-        final AtomicBoolean error = new AtomicBoolean(false);
-
-        /** */
-        final CountDownLatch rcvLatch;
-
-        /** */
-        final Ignite sender;
-
-        /** */
-        @LoggerResource
-        private transient IgniteLogger  log;
-
-        /**
-         * @param sender
-         * @param latchCount
-         */
-        public ReceiveRemoteMessageListener(Ignite sender, int latchCount) {
-            this.sender = sender;
-            rcvLatch = new CountDownLatch(latchCount);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(UUID nodeId, Object msg) {
-            try {
-                log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-                if (!nodeId.equals(sender.cluster().localNode().id())) {
-                    log.info("Unexpected sender node: " + nodeId);
-
-                    error.set(true);
-
-                    return false;
-                }
-
-                rcvMsgs.add(msg);
-
-                return true;
-            }
-            finally {
-                rcvLatch.countDown();
-            }
-        }
-    }
-
-
-    /**
-     *
-     */
-    private static class ListenForOldestListener<UUID, Object> implements P2<UUID, Object> {
-        /** */
-        @LoggerResource
-        private transient IgniteLogger log;
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(UUID nodeId, Object msg) {
-            log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
-
-            MSG_CNT.incrementAndGet();
-
-            return true;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class ListenWithIntTopic implements P2<UUID, Object> {
-        /** */
-        final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
-
-        /** */
-        final AtomicBoolean error = new AtomicBoolean(false);
-
-        /** */
-        final CountDownLatch rcvLatch = new CountDownLatch(1);
-
-        /** */
-        private final Ignite sender;
-
-        /** */
-        private final Ignite receiver;
-
-        /** */
-        @IgniteInstanceResource
-        private transient Ignite g;
-
-        /** */
-        @LoggerResource
-        private transient IgniteLogger log;
-
-        /** */
-        final Integer topic;
-
-        /** */
-        final String message;
-
-        /**
-         * @param sender
-         * @param receiver
-         * @param topic
-         * @param message
-         */
-        public ListenWithIntTopic(Ignite sender, Ignite receiver, Integer topic, String message) {
-            this.sender = sender;
-            this.receiver = receiver;
-            this.topic = topic;
-            this.message = message;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(UUID nodeId, Object msg) {
-            assertEquals(receiver, g);
-
-            try {
-                log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId +
-                             ", topic=" + topic + ']');
-
-                if (!nodeId.equals(sender.cluster().localNode().id())) {
-                    log.error("Unexpected sender node: " + nodeId);
-
-                    error.set(true);
-
-                    return false;
-                }
-
-                if (!message.equals(msg)) {
-                    log.error("Unexpected message " + msg + " for topic: " + topic);
-
-                    error.set(true);
-
-                    return false;
-                }
+        ignite1.message(grp).remoteListen(null, new P2<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                System.out.println("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
 
-                rcvMsgs.add(msg);
+                MSG_CNT.incrementAndGet();
 
                 return true;
             }
-            finally {
-                rcvLatch.countDown();
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    private static class MessageReceiverListener<UUID, Object> implements P2<UUID, Object> {
-        /** */
-        final Collection<java.lang.Object> rcvMsgs = new ConcurrentLinkedDeque<>();
-
-        /** */
-        final CountDownLatch rcvLatch = new CountDownLatch(4);
-
-        /** */
-        @LoggerResource
-        private transient IgniteLogger log;
+        });
 
-        /** {@inheritDoc} */
-        @Override public boolean apply(UUID nodeId, Object msg) {
-            try {
-                log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+        ignite1.message().send(null, MSG_1);
 
-                rcvMsgs.add(msg);
+        Thread.sleep(3000);
 
-                return true;
-            }
-            finally {
-                rcvLatch.countDown();
-            }
-        }
+        assertEquals(1, MSG_CNT.get());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d64185b8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 8c061be..949b76d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -66,7 +66,7 @@ public class IgniteBasicTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(GridSelfTest.class));
         GridTestUtils.addTestIfNeeded(suite, GridProjectionSelfTest.class, ignoredTests);
-        suite.addTest(new TestSuite(GridMessagingSelfTest.class));
+        GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests);
         suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
         GridTestUtils.addTestIfNeeded(suite, GridMessagingNoPeerClassLoadingSelfTest.class, ignoredTests);