You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 11:09:47 UTC

[52/63] [abbrv] ignite git commit: IGNITE-3807 IgniteSpiContext registers message listeners incorrectly

IGNITE-3807 IgniteSpiContext registers message listeners incorrectly


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ff9e5e8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ff9e5e8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ff9e5e8

Branch: refs/heads/ignite-comm-opts2-fix
Commit: 5ff9e5e8c95ea0fe37d281a7ff46e148469f952a
Parents: bbc9758
Author: Saikat Maitra <sa...@gmail.com>
Authored: Mon Sep 26 15:28:51 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Mon Sep 26 15:39:23 2016 -0700

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |  15 ++
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  11 +
 .../org/apache/ignite/spi/IgniteSpiContext.java |  26 ++-
 ...GridManagerLocalMessageListenerSelfTest.java | 222 +++++++++++++++++++
 .../testframework/GridSpiTestContext.java       | 115 +++++++++-
 .../ignite/testsuites/IgniteSpiTestSuite.java   |   6 +-
 6 files changed, 391 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5ff9e5e8/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 4fe8ca8..584cc56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -396,6 +397,20 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         }
                     }
 
+                    @Override public void addLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) {
+                        A.notNull(topic, "topic");
+                        A.notNull(p, "p");
+
+                        ctx.io().addUserMessageListener(topic, p);
+                    }
+
+                    @Override public void removeLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) {
+                        A.notNull(topic, "topic");
+                        A.notNull(topic, "p");
+
+                        ctx.io().removeUserMessageListener(topic, p);
+                    }
+
                     @SuppressWarnings("deprecation")
                     @Override public void addMessageListener(GridMessageListener lsnr, String topic) {
                         A.notNull(lsnr, "lsnr");

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ff9e5e8/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 65b0420..219d07b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -759,6 +760,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
+        @Override public void addLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) {
+            /* No-op. */
+        }
+
+        /** {@inheritDoc} */
         @Override public void recordEvent(Event evt) {
             /* No-op. */
         }
@@ -849,6 +855,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
+        @Override public void removeLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) {
+             /* No-op. */
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean removeMessageListener(GridMessageListener lsnr, String topic) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ff9e5e8/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 899c222..5eb5227 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.security.SecuritySubject;
@@ -116,6 +117,23 @@ public interface IgniteSpiContext {
     public void send(ClusterNode node, Serializable msg, String topic) throws IgniteSpiException;
 
     /**
+     * Register an local message listener to receive messages sent by remote nodes. The underlying
+     * communication mechanism is defined by {@link org.apache.ignite.spi.communication.CommunicationSpi} implementation used.
+     *
+     * @param topic Topic to subscribe to.
+     * @param p Message predicate.
+     */
+    public void addLocalMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p);
+
+    /**
+     * Removes a previously registered local message listener.
+     *
+     * @param topic Topic to unsubscribe from.
+     * @param p Message predicate.
+     */
+    public void removeLocalMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p);
+
+    /**
      * Register a message listener to receive messages sent by remote nodes. The underlying
      * communication mechanism is defined by {@link org.apache.ignite.spi.communication.CommunicationSpi} implementation used.
      * <p>
@@ -124,7 +142,10 @@ public interface IgniteSpiContext {
      *
      * @param lsnr Message listener to register.
      * @param topic Topic to register listener for.
+     *
+     * @deprecated Use {@link #addLocalMessageListener(Object, IgniteBiPredicate)} instead.
      */
+    @Deprecated
     public void addMessageListener(GridMessageListener lsnr, String topic);
 
     /**
@@ -134,7 +155,10 @@ public interface IgniteSpiContext {
      * @param topic Topic to unregister listener for.
      * @return {@code true} of message listener was removed, {@code false} if it was not
      *      previously registered.
+     *
+     * @deprecated Use {@link #removeLocalMessageListener(Object, IgniteBiPredicate)} instead.
      */
+    @Deprecated
     public boolean removeMessageListener(GridMessageListener lsnr, String topic);
 
     /**
@@ -328,4 +352,4 @@ public interface IgniteSpiContext {
      * @param c Timeout object.
      */
     public void removeTimeoutObject(IgniteSpiTimeoutObject c);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ff9e5e8/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
new file mode 100644
index 0000000..4aadc78
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
+import org.apache.ignite.internal.util.typedef.CO;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpi;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * Test Managers to add and remove local message listener.
+ */
+public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final byte DIRECT_TYPE = (byte)210;
+
+    static {
+        GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridIoUserMessage();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(discoSpi);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        c.setCommunicationSpi(commSpi);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSendMessage() throws Exception {
+        startGridsMultiThreaded(2);
+
+        IgniteSpiContext ctx0 = ((IgniteSpiAdapter)grid(0).context().io().getSpi()).getSpiContext();
+        IgniteSpiContext ctx1 = ((IgniteSpiAdapter)grid(1).context().io().getSpi()).getSpiContext();
+
+        String topic = "test-topic";
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        ctx1.addLocalMessageListener(topic, new IgniteBiPredicate<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                assertEquals("Message", msg);
+
+                latch.countDown();
+
+                return true;
+            }
+        });
+
+        long time = System.nanoTime();
+
+        ctx0.send(grid(1).localNode(), "Message", topic);
+
+        assert latch.await(3, SECONDS);
+
+        time = System.nanoTime() - time;
+
+        info(">>>");
+        info(">>> send() time (ms): " + MILLISECONDS.convert(time, NANOSECONDS));
+        info(">>>");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAddLocalMessageListener() throws Exception {
+        startGrid();
+
+        Manager mgr = new Manager(grid().context(), new Spi());
+
+        mgr.start();
+
+        mgr.onKernalStart();
+
+        assertTrue(mgr.enabled());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoveLocalMessageListener() throws Exception {
+        startGrid();
+
+        Manager mgr = new Manager(grid().context(), new Spi());
+
+        assertTrue(mgr.enabled());
+
+        mgr.onKernalStart();
+
+        mgr.onKernalStop(false);
+
+        mgr.stop(false);
+
+        assertTrue(mgr.enabled());
+    }
+
+    /** */
+    private static class Manager extends GridManagerAdapter<IgniteSpi> {
+        /**
+         * @param ctx Kernal context.
+         * @param spis Specific SPI instance.
+         */
+        protected Manager(GridKernalContext ctx, IgniteSpi... spis) {
+            super(ctx, spis);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start() throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop(boolean cancel) throws IgniteCheckedException {
+            // No-op.
+        }
+    }
+
+    /**
+     * Test Spi.
+     */
+    private static interface TestSpi extends IgniteSpi {
+        // No-op.
+    }
+
+    /**
+     * Spi
+     */
+    private static class Spi extends IgniteSpiAdapter implements TestSpi {
+        /** Ignite Spi Context. **/
+        private IgniteSpiContext spiCtx;
+
+        /** Test message topic. **/
+        private String TEST_TOPIC = "test_topic";
+
+        /** {@inheritDoc} */
+        @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStop() throws IgniteSpiException {
+            // No-op.
+        }
+
+        @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+            this.spiCtx = spiCtx;
+
+            spiCtx.addLocalMessageListener(TEST_TOPIC, new IgniteBiPredicate<UUID, Object>() {
+                @Override public boolean apply(UUID uuid, Object o) {
+                    return true;
+                }
+            });
+
+        }
+
+        @Override public void onContextDestroyed0() {
+            spiCtx.removeLocalMessageListener(TEST_TOPIC, new IgniteBiPredicate<UUID, Object>() {
+                @Override public boolean apply(UUID uuid, Object o) {
+                    return true;
+                }
+            });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ff9e5e8/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 0bffe8b..ac50ef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -29,21 +29,26 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.TaskEvent;
 import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
@@ -61,6 +66,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
 
 /**
  * Test SPI context.
@@ -241,7 +247,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
      * @param nodeId Node ID.
      */
     public void removeNode(UUID nodeId) {
-        for (Iterator<ClusterNode> iter = rmtNodes.iterator(); iter.hasNext();) {
+        for (Iterator<ClusterNode> iter = rmtNodes.iterator(); iter.hasNext(); ) {
             ClusterNode node = iter.next();
 
             if (node.id().equals(nodeId)) {
@@ -327,6 +333,27 @@ public class GridSpiTestContext implements IgniteSpiContext {
 
     /** {@inheritDoc} */
     @SuppressWarnings("deprecation")
+    @Override public void addLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) {
+        try {
+            addMessageListener(TOPIC_COMM_USER,
+                new GridLocalMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param topic Listener's topic.
+     * @param lsnr Listener to add.
+     */
+    @SuppressWarnings({"TypeMayBeWeakened", "deprecation"})
+    public void addMessageListener(GridTopic topic, GridMessageListener lsnr) {
+        addMessageListener(lsnr, ((Object)topic).toString());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
     @Override public void addMessageListener(GridMessageListener lsnr, String topic) {
         msgLsnrs.add(lsnr);
     }
@@ -337,6 +364,28 @@ public class GridSpiTestContext implements IgniteSpiContext {
         return msgLsnrs.remove(lsnr);
     }
 
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public void removeLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) {
+        try {
+            removeMessageListener(TOPIC_COMM_USER,
+                new GridLocalMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param topic Listener's topic.
+     * @param lsnr Listener to remove.
+     * @return Whether or not the lsnr was removed.
+     */
+    @SuppressWarnings("deprecation")
+    public boolean removeMessageListener(GridTopic topic, @Nullable GridMessageListener lsnr) {
+        return removeMessageListener(lsnr, ((Object)topic).toString());
+    }
+
     /**
      * @param type Event type.
      * @param taskName Task name.
@@ -471,7 +520,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
         boolean res = false;
 
         try {
-            res =  get(cacheName, key) != null;
+            res = get(cacheName, key) != null;
         }
         catch (IgniteException ignored) {
 
@@ -587,4 +636,66 @@ public class GridSpiTestContext implements IgniteSpiContext {
             this.obj = obj;
         }
     }
+
+    /**
+     * This class represents a message listener wrapper that knows about peer deployment.
+     */
+    private class GridLocalMessageListener implements GridMessageListener {
+        /** Predicate listeners. */
+        private final IgniteBiPredicate<UUID, Object> predLsnr;
+
+        /** User message topic. */
+        private final Object topic;
+
+        /**
+         * @param topic User topic.
+         * @param predLsnr Predicate listener.
+         * @throws IgniteCheckedException If failed to inject resources to predicates.
+         */
+        GridLocalMessageListener(@Nullable Object topic, @Nullable IgniteBiPredicate<UUID, Object> predLsnr)
+            throws IgniteCheckedException {
+            this.topic = topic;
+            this.predLsnr = predLsnr;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings({
+            "SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
+            "OverlyStrongTypeCast"})
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
+
+            ClusterNode node = locNode;
+                Object msgBody = ioMsg.body();
+
+                assert msgBody != null || ioMsg.bodyBytes() != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            GridLocalMessageListener l = (GridLocalMessageListener)o;
+
+            return F.eq(predLsnr, l.predLsnr) && F.eq(topic, l.topic);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = predLsnr != null ? predLsnr.hashCode() : 0;
+
+            res = 31 * res + (topic != null ? topic.hashCode() : 0);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(GridLocalMessageListener.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ff9e5e8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiTestSuite.java
index fc4023a..90f1218 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.managers.GridManagerLocalMessageListenerSelfTest;
 import org.apache.ignite.internal.managers.GridNoopManagerSelfTest;
 
 /**
@@ -64,6 +65,9 @@ public class IgniteSpiTestSuite extends TestSuite {
         // All other tests.
         suite.addTestSuite(GridNoopManagerSelfTest.class);
 
+        // Local Message Listener tests.
+        suite.addTestSuite(GridManagerLocalMessageListenerSelfTest.class);
+
         return suite;
     }
-}
\ No newline at end of file
+}