You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/07/20 10:47:30 UTC

[ignite-3] branch main updated: IGNITE-14983 Enable registering message listeners for message groups (#229)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3adc3d7  IGNITE-14983 Enable registering message listeners for message groups (#229)
3adc3d7 is described below

commit 3adc3d7f167d2e7a2548211867e5d498d5cec7e3
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Tue Jul 20 13:47:21 2021 +0300

    IGNITE-14983 Enable registering message listeners for message groups (#229)
---
 .../matchers}/CompletableFutureMatcher.java        |  9 +-
 .../internal/metastorage/MetaStorageManager.java   |  2 +-
 .../ignite/network/AbstractMessagingService.java   | 48 +++++++++--
 .../apache/ignite/network/MessagingService.java    |  8 +-
 modules/network/pom.xml                            |  7 ++
 .../scalecube/ITScaleCubeNetworkMessagingTest.java | 95 +++++++++++++++++++++-
 .../scalecube/ScaleCubeMessagingService.java       |  2 +-
 .../MessageSerializationRegistryImplTest.java      | 28 ++++---
 .../apache/ignite/network/TestMessageTypes.java    |  3 +
 .../internal/raft/server/impl/RaftServerImpl.java  | 44 +++++-----
 .../apache/ignite/raft/jraft/rpc/RpcServer.java    |  5 +-
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       | 54 ++++++++----
 .../ignite/raft/jraft/rpc/TestIgniteRpcServer.java |  4 +
 .../PersistencePropertiesVaultServiceTest.java     |  2 +-
 .../ignite/internal/vault/VaultManagerTest.java    |  2 +-
 .../ignite/internal/vault/VaultServiceTest.java    |  2 +-
 16 files changed, 245 insertions(+), 70 deletions(-)

diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/CompletableFutureMatcher.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
similarity index 87%
rename from modules/vault/src/test/java/org/apache/ignite/internal/vault/CompletableFutureMatcher.java
rename to modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index ecff5bc..bfb958c 100644
--- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/CompletableFutureMatcher.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.vault;
+package org.apache.ignite.internal.testframework.matchers;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -58,6 +58,13 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu
         description.appendText("is ").appendDescriptionOf(matcher);
     }
 
+    /** {@inheritDoc} */
+    @Override protected void describeMismatchSafely(CompletableFuture<T> item, Description mismatchDescription) {
+        Object valueDescription = item.isDone() ? item.join() : item;
+
+        mismatchDescription.appendText("was ").appendValue(valueDescription);
+    }
+
     /**
      * Factory method.
      *
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 697b213..a2af938 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -174,7 +174,7 @@ public class MetaStorageManager {
 //        );
 
         // TODO: IGNITE-14414 Cluster initialization flow. Here we should complete metaStorageServiceFuture.
-        clusterNetSvc.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {});
+//        clusterNetSvc.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {});
     }
 
     /**
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/AbstractMessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/AbstractMessagingService.java
index 61046da..33df0ed 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/AbstractMessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/AbstractMessagingService.java
@@ -17,26 +17,58 @@
 
 package org.apache.ignite.network;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.network.annotations.MessageGroup;
 
 /**
  * Base class for {@link MessagingService} implementations.
  */
 public abstract class AbstractMessagingService implements MessagingService {
-    /** */
-    private final Collection<NetworkMessageHandler> messageHandlers = new CopyOnWriteArrayList<>();
+    /** Mapping from group type (array index) to a list of registered message handlers. */
+    private final AtomicReferenceArray<List<NetworkMessageHandler>> handlersByGroupType =
+        new AtomicReferenceArray<>(Short.MAX_VALUE + 1);
 
     /** {@inheritDoc} */
-    @Override public void addMessageHandler(NetworkMessageHandler handler) {
-        messageHandlers.add(handler);
+    @Override public void addMessageHandler(Class<?> messageGroup, NetworkMessageHandler handler) {
+        handlersByGroupType.getAndUpdate(getMessageGroupType(messageGroup), handlers -> {
+            if (handlers == null)
+                return List.of(handler);
+
+            var result = new ArrayList<NetworkMessageHandler>(handlers.size() + 1);
+
+            result.addAll(handlers);
+            result.add(handler);
+
+            return result;
+        });
+    }
+
+    /**
+     * Extracts the message group ID from a class annotated with {@link MessageGroup}.
+     */
+    private static short getMessageGroupType(Class<?> messageGroup) {
+        MessageGroup annotation = messageGroup.getAnnotation(MessageGroup.class);
+
+        assert annotation != null : "No MessageGroup annotation present on " + messageGroup;
+
+        short groupType = annotation.groupType();
+
+        assert groupType >= 0 : "Group type must not be negative";
+
+        return groupType;
     }
 
     /**
      * @return registered message handlers.
      */
-    public Collection<NetworkMessageHandler> getMessageHandlers() {
-        return Collections.unmodifiableCollection(messageHandlers);
+    protected final Collection<NetworkMessageHandler> getMessageHandlers(short groupType) {
+        assert groupType >= 0 : "Group type must not be negative";
+
+        List<NetworkMessageHandler> result = handlersByGroupType.get(groupType);
+
+        return result == null ? List.of() : result;
     }
 }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index 56e0b77..c524856 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.network;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.annotations.MessageGroup;
 
 /**
  * Entry point for sending messages between network members in both weak and patient mode.
@@ -90,9 +91,12 @@ public interface MessagingService {
     CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout);
 
     /**
-     * Registers a handler for network message events.
+     * Registers a listener for a group of network message events.
+     * <p>
+     * Message group is specified by providing a class annotated with the {@link MessageGroup} annotation.
      *
+     * @param messageGroup Message group descriptor.
      * @param handler Message handler.
      */
-    void addMessageHandler(NetworkMessageHandler handler);
+    void addMessageHandler(Class<?> messageGroup, NetworkMessageHandler handler);
 }
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index cedd484..0fd718b 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -67,6 +67,13 @@
 
         <!-- Test dependencies -->
         <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-library</artifactId>
             <scope>test</scope>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 058a013..a2c78da 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.network.scalecube;
 
+import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Collection;
@@ -29,21 +30,27 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import io.scalecube.cluster.ClusterImpl;
 import io.scalecube.cluster.transport.api.Transport;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.ClusterServiceFactory;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessageTypes;
 import org.apache.ignite.network.TestMessagesFactory;
 import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.annotations.MessageGroup;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import reactor.core.publisher.Mono;
 
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -79,6 +86,7 @@ class ITScaleCubeNetworkMessagingTest {
 
         for (ClusterService member : testCluster.members) {
             member.messagingService().addMessageHandler(
+                TestMessageTypes.class,
                 (message, senderAddr, correlationId) -> {
                     messageStorage.put(member.localConfiguration().getName(), (TestMessage)message);
                     messageReceivedLatch.countDown();
@@ -106,6 +114,7 @@ class ITScaleCubeNetworkMessagingTest {
 
     /**
      * Tests graceful shutdown.
+     *
      * @throws Exception If failed.
      */
     @Test
@@ -151,6 +160,7 @@ class ITScaleCubeNetworkMessagingTest {
         var dataFuture = new CompletableFuture<Data>();
 
         member.messagingService().addMessageHandler(
+            TestMessageTypes.class,
             (message, senderAddr, correlationId) ->
                 dataFuture.complete(new Data((TestMessage)message, senderAddr, correlationId))
         );
@@ -182,10 +192,13 @@ class ITScaleCubeNetworkMessagingTest {
         var requestMessage = messageFactory.testMessage().msg("request").build();
         var responseMessage = messageFactory.testMessage().msg("response").build();
 
-        member.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {
-            if (message.equals(requestMessage))
-                member.messagingService().send(self, responseMessage, correlationId);
-        });
+        member.messagingService().addMessageHandler(
+            TestMessageTypes.class,
+            (message, senderAddr, correlationId) -> {
+                if (message.equals(requestMessage))
+                    member.messagingService().send(self, responseMessage, correlationId);
+            }
+        );
 
         TestMessage actualResponseMessage = member.messagingService()
             .invoke(self, requestMessage, 1000)
@@ -196,7 +209,79 @@ class ITScaleCubeNetworkMessagingTest {
     }
 
     /**
+     * Serializable message that belongs to the {@link NetworkMessageTypes} message group.
+     */
+    private static class MockNetworkMessage implements NetworkMessage, Serializable {
+        /** {@inheritDoc} */
+        @Override public short messageType() {
+            return 666;
+        }
+
+        /** {@inheritDoc} */
+        @Override public short groupType() {
+            return NetworkMessageTypes.class.getAnnotation(MessageGroup.class).groupType();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return getClass() == obj.getClass();
+        }
+    }
+
+    /**
+     * Tests that messages from different message groups can be delivered to different sets of handlers.
+     */
+    @Test
+    public void testMessageGroupsHandlers() throws Exception {
+        testCluster = new Cluster(2);
+        testCluster.startAwait();
+
+        ClusterService node1 = testCluster.members.get(0);
+        ClusterService node2 = testCluster.members.get(1);
+
+        var testMessageFuture1 = new CompletableFuture<NetworkMessage>();
+        var testMessageFuture2 = new CompletableFuture<NetworkMessage>();
+        var networkMessageFuture = new CompletableFuture<NetworkMessage>();
+
+        // register multiple handlers for the same group
+        node1.messagingService().addMessageHandler(
+            TestMessageTypes.class,
+            (message, senderAddr, correlationId) -> assertTrue(testMessageFuture1.complete(message))
+        );
+
+        node1.messagingService().addMessageHandler(
+            TestMessageTypes.class,
+            (message, senderAddr, correlationId) -> assertTrue(testMessageFuture2.complete(message))
+        );
+
+        // register a different handle for the second group
+        node1.messagingService().addMessageHandler(
+            NetworkMessageTypes.class,
+            (message, senderAddr, correlationId) -> assertTrue(networkMessageFuture.complete(message))
+        );
+
+        var testMessage = messageFactory.testMessage().msg("foo").build();
+
+        var networkMessage = new MockNetworkMessage();
+
+        // test that a message gets delivered to both handlers
+        node2.messagingService()
+            .send(node1.topologyService().localMember(), testMessage)
+            .get(1, TimeUnit.SECONDS);
+
+        // test that a message from the other group is only delivered to a single handler
+        node2.messagingService()
+            .send(node1.topologyService().localMember(), networkMessage)
+            .get(1, TimeUnit.SECONDS);
+
+        assertThat(testMessageFuture1, willBe(equalTo(testMessage)));
+        assertThat(testMessageFuture2, willBe(equalTo(testMessage)));
+        assertThat(networkMessageFuture, willBe(equalTo(networkMessage)));
+    }
+
+    /**
      * Tests shutdown.
+     *
      * @param forceful Whether shutdown should be forceful.
      * @throws Exception If failed.
      */
@@ -238,6 +323,7 @@ class ITScaleCubeNetworkMessagingTest {
 
     /**
      * Find the cluster's transport and force it to stop.
+     *
      * @param cluster Cluster to be shutdown.
      * @throws Exception If failed to stop.
      */
@@ -319,6 +405,7 @@ class ITScaleCubeNetworkMessagingTest {
 
         /**
          * Start and wait for cluster to come up.
+         *
          * @throws InterruptedException If failed.
          */
         void startAwait() throws InterruptedException {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 9910eb7..cb9e185 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -60,7 +60,7 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
 
         String correlationId = message.correlationId();
 
-        for (NetworkMessageHandler handler : getMessageHandlers())
+        for (NetworkMessageHandler handler : getMessageHandlers(msg.groupType()))
             handler.onReceived(msg, address, correlationId);
     }
 
diff --git a/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java b/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
index b7e40fa..105911f 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.network;
 
 import org.apache.ignite.network.serialization.MessageDeserializer;
 import org.apache.ignite.network.serialization.MessageSerializationFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.network.serialization.MessageSerializer;
 import org.junit.jupiter.api.Test;
 
@@ -30,13 +31,14 @@ import static org.mockito.Mockito.mock;
  * {@link MessageSerializationRegistryImpl} tests.
  */
 public class MessageSerializationRegistryImplTest {
+    /** Default registry implementation. */
+    private final MessageSerializationRegistry registry = new MessageSerializationRegistryImpl();
+
     /**
      * Tests that a serialization factory can be registered.
      */
     @Test
     public void testRegisterFactory() {
-        var registry = new MessageSerializationRegistryImpl();
-
         registry.registerFactory(Msg.GROUP_TYPE, Msg.TYPE, new MsgSerializationFactory());
     }
 
@@ -46,8 +48,6 @@ public class MessageSerializationRegistryImplTest {
      */
     @Test
     public void testRegisterFactoryWithSameType() {
-        var registry = new MessageSerializationRegistryImpl();
-
         registry.registerFactory(Msg.GROUP_TYPE, Msg.TYPE, new MsgSerializationFactory());
 
         assertThrows(
@@ -62,8 +62,6 @@ public class MessageSerializationRegistryImplTest {
      */
     @Test
     public void testRegisterFactoryWithSameTypeDifferentModule() {
-        var registry = new MessageSerializationRegistryImpl();
-
         registry.registerFactory(Msg.GROUP_TYPE, Msg.TYPE, new MsgSerializationFactory());
 
         short nextGroupType = Msg.GROUP_TYPE + 1;
@@ -80,8 +78,6 @@ public class MessageSerializationRegistryImplTest {
      */
     @Test
     public void testCreateSerializers() {
-        var registry = new MessageSerializationRegistryImpl();
-
         registry.registerFactory(Msg.GROUP_TYPE, Msg.TYPE, new MsgSerializationFactory());
 
         assertNotNull(registry.createSerializer(Msg.GROUP_TYPE, Msg.TYPE));
@@ -94,12 +90,24 @@ public class MessageSerializationRegistryImplTest {
      */
     @Test
     public void testCreateSerializersIfNotRegistered() {
-        var registry = new MessageSerializationRegistryImpl();
-
         assertThrows(NetworkConfigurationException.class, () -> registry.createSerializer(Msg.GROUP_TYPE, Msg.TYPE));
         assertThrows(NetworkConfigurationException.class, () -> registry.createDeserializer(Msg.GROUP_TYPE, Msg.TYPE));
     }
 
+    /**
+     * Tests that edge values of group and message types are handled without out-of-bound errors.
+     */
+    @Test
+    public void testEdgeValues() {
+        registry.registerFactory((short)0, (short)0, new MsgSerializationFactory());
+
+        assertNotNull(registry.createSerializer((short)0, (short)0));
+
+        registry.registerFactory(Short.MAX_VALUE, Short.MAX_VALUE, new MsgSerializationFactory());
+
+        assertNotNull(registry.createSerializer(Short.MAX_VALUE, Short.MAX_VALUE));
+    }
+
     /** */
     private static class Msg implements NetworkMessage {
         /** */
diff --git a/modules/network/src/test/java/org/apache/ignite/network/TestMessageTypes.java b/modules/network/src/test/java/org/apache/ignite/network/TestMessageTypes.java
index 94ba495..d00ab5b 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/TestMessageTypes.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/TestMessageTypes.java
@@ -21,9 +21,12 @@ import org.apache.ignite.network.annotations.MessageGroup;
 
 @MessageGroup(groupName = "TestMessages", groupType = 2)
 public class TestMessageTypes {
+    /** */
     public static final short ALL_TYPES = 1;
 
+    /** */
     public static final short TEST = 2;
 
+    /** */
     public static final short NESTED_MESSAGE = 3;
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 0386379..699bdd4 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -38,8 +38,9 @@ import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.message.ActionRequest;
 import org.apache.ignite.raft.client.message.GetLeaderRequest;
 import org.apache.ignite.raft.client.message.GetLeaderResponse;
-import org.apache.ignite.raft.client.message.RaftErrorResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageGroup;
 import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.message.RaftErrorResponse;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.RaftGroupListener;
 import org.jetbrains.annotations.Nullable;
@@ -90,32 +91,35 @@ public class RaftServerImpl implements RaftServer {
         readQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
         writeQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
 
-        service.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {
-            if (message instanceof GetLeaderRequest) {
-                var localPeer = new Peer(service.topologyService().localMember().address());
+        service.messagingService().addMessageHandler(
+            RaftClientMessageGroup.class,
+            (message, senderAddr, correlationId) -> {
+                if (message instanceof GetLeaderRequest) {
+                    var localPeer = new Peer(service.topologyService().localMember().address());
 
-                GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(localPeer).build();
+                    GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(localPeer).build();
 
-                service.messagingService().send(senderAddr, resp, correlationId);
-            }
-            else if (message instanceof ActionRequest) {
-                ActionRequest req0 = (ActionRequest)message;
+                    service.messagingService().send(senderAddr, resp, correlationId);
+                }
+                else if (message instanceof ActionRequest) {
+                    ActionRequest req0 = (ActionRequest)message;
 
-                RaftGroupListener lsnr = listeners.get(req0.groupId());
+                    RaftGroupListener lsnr = listeners.get(req0.groupId());
 
-                if (lsnr == null) {
-                    sendError(senderAddr, correlationId, RaftErrorCode.ILLEGAL_STATE);
+                    if (lsnr == null) {
+                        sendError(senderAddr, correlationId, RaftErrorCode.ILLEGAL_STATE);
 
-                    return;
-                }
+                        return;
+                    }
 
-                if (req0.command() instanceof ReadCommand)
-                    handleActionRequest(senderAddr, req0, correlationId, readQueue, lsnr);
-                else
-                    handleActionRequest(senderAddr, req0, correlationId, writeQueue, lsnr);
+                    if (req0.command() instanceof ReadCommand)
+                        handleActionRequest(senderAddr, req0, correlationId, readQueue, lsnr);
+                    else
+                        handleActionRequest(senderAddr, req0, correlationId, writeQueue, lsnr);
+                }
+                // TODO https://issues.apache.org/jira/browse/IGNITE-14775
             }
-            // TODO https://issues.apache.org/jira/browse/IGNITE-14775
-        });
+        );
 
         readWorker = new Thread(() -> processQueue(readQueue, RaftGroupListener::onRead), "read-cmd-worker#" + service.topologyService().localMember().toString());
         readWorker.setDaemon(true);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcServer.java
index 7afb6c4..cc9f8b0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcServer.java
@@ -23,20 +23,19 @@ import org.apache.ignite.raft.jraft.rpc.impl.ConnectionClosedEventListener;
  *
  */
 public interface RpcServer<T> extends Lifecycle<T> {
-
     /**
      * Register a conn closed event listener.
      *
      * @param listener the event listener.
      */
-    void registerConnectionClosedEventListener(final ConnectionClosedEventListener listener);
+    void registerConnectionClosedEventListener(ConnectionClosedEventListener listener);
 
     /**
      * Register user processor.
      *
      * @param processor the user processor which has a interest
      */
-    void registerProcessor(final RpcProcessor<?> processor);
+    void registerProcessor(RpcProcessor<?> processor);
 
     /**
      * @return bound port
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 03833ed..fa7f43c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -21,6 +21,9 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.message.RaftClientMessageGroup;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -58,6 +61,10 @@ import org.jetbrains.annotations.Nullable;
 public class IgniteRpcServer implements RpcServer<Void> {
     private final ClusterService service;
 
+    private final NodeManager nodeManager;
+
+    private final Executor rpcExecutor;
+
     private final List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
 
     private final Map<String, RpcProcessor> processors = new ConcurrentHashMap<>();
@@ -77,6 +84,8 @@ public class IgniteRpcServer implements RpcServer<Void> {
         @Nullable Executor rpcExecutor
     ) {
         this.service = service;
+        this.nodeManager = nodeManager;
+        this.rpcExecutor = rpcExecutor;
 
         // raft server RPC
         AppendEntriesRequestProcessor appendEntriesRequestProcessor =
@@ -106,8 +115,30 @@ public class IgniteRpcServer implements RpcServer<Void> {
         registerProcessor(new ActionRequestProcessor(rpcExecutor, raftClientMessagesFactory));
         registerProcessor(new org.apache.ignite.raft.jraft.rpc.impl.client.SnapshotRequestProcessor(rpcExecutor, raftClientMessagesFactory));
 
-        service.messagingService().addMessageHandler((msg, senderAddr, corellationId) -> {
-            Class<? extends NetworkMessage> cls = msg.getClass();
+        var messageHandler = new RpcMessageHandler();
+
+        service.messagingService().addMessageHandler(RaftMessageGroup.class, messageHandler);
+        service.messagingService().addMessageHandler(RaftClientMessageGroup.class, messageHandler);
+
+        service.topologyService().addEventHandler(new TopologyEventHandler() {
+            @Override public void onAppeared(ClusterNode member) {
+                // TODO asch optimize start replicator https://issues.apache.org/jira/browse/IGNITE-14843
+            }
+
+            @Override public void onDisappeared(ClusterNode member) {
+                for (ConnectionClosedEventListener listener : listeners)
+                    listener.onClosed(service.topologyService().localMember().name(), member.name());
+            }
+        });
+    }
+
+    /**
+     * Implementation of a message handler that dispatches the incoming requests to a suitable {@link RpcProcessor}.
+     */
+    public class RpcMessageHandler implements NetworkMessageHandler {
+        /** {@inheritDoc} */
+        @Override public void onReceived(NetworkMessage message, NetworkAddress senderAddr, String correlationId) {
+            Class<? extends NetworkMessage> cls = message.getClass();
             RpcProcessor<NetworkMessage> prc = processors.get(cls.getName());
 
             // TODO asch cache mapping https://issues.apache.org/jira/browse/IGNITE-14832
@@ -128,7 +159,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
             Executor executor = null;
 
             if (selector != null)
-                executor = selector.select(prc.getClass().getName(), msg, nodeManager);
+                executor = selector.select(prc.getClass().getName(), message, nodeManager);
 
             if (executor == null)
                 executor = prc.executor();
@@ -145,7 +176,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
                     }
 
                     @Override public void sendResponse(Object responseObj) {
-                        service.messagingService().send(senderAddr, (NetworkMessage) responseObj, corellationId);
+                        service.messagingService().send(senderAddr, (NetworkMessage) responseObj, correlationId);
                     }
 
                     @Override public NetworkAddress getRemoteAddress() {
@@ -157,20 +188,9 @@ public class IgniteRpcServer implements RpcServer<Void> {
                     }
                 };
 
-                finalPrc.handleRequest(context, msg);
+                finalPrc.handleRequest(context, message);
             });
-        });
-
-        service.topologyService().addEventHandler(new TopologyEventHandler() {
-            @Override public void onAppeared(ClusterNode member) {
-                // TODO asch optimize start replicator https://issues.apache.org/jira/browse/IGNITE-14843
-            }
-
-            @Override public void onDisappeared(ClusterNode member) {
-                for (ConnectionClosedEventListener listener : listeners)
-                    listener.onClosed(service.topologyService().localMember().name(), member.name());
-            }
-        });
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 6bc1a1e..a621b39 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -29,6 +29,7 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
  * RPC server configured for integration tests.
  */
 public class TestIgniteRpcServer extends IgniteRpcServer {
+    /** */
     private final NodeOptions nodeOptions;
 
     /**
@@ -51,9 +52,12 @@ public class TestIgniteRpcServer extends IgniteRpcServer {
             JRaftUtils.createRequestExecutor(nodeOptions)
         );
 
+        clusterService.messagingService().addMessageHandler(TestMessageGroup.class, new RpcMessageHandler());
+
         this.nodeOptions = nodeOptions;
     }
 
+    /** {@inheritDoc} */
     @Override public void shutdown() {
         super.shutdown();
 
diff --git a/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java b/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
index 410afcb..a871fda 100644
--- a/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
+++ b/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.lang.ByteArray;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import static org.apache.ignite.internal.vault.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
index d8283cd..89c61d5 100644
--- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
+++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
@@ -22,7 +22,7 @@ import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
-import static org.apache.ignite.internal.vault.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
index 5166587..e2337c9 100644
--- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
+++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
@@ -35,7 +35,7 @@ import org.junit.jupiter.api.Test;
 
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
-import static org.apache.ignite.internal.vault.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;