You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/12/06 12:03:41 UTC

[ignite-3] branch main updated: IGNITE-18319 Pass sender consistendId instead of ClusterNode to handler (#1406)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9548206c58 IGNITE-18319 Pass sender consistendId instead of ClusterNode to handler (#1406)
9548206c58 is described below

commit 9548206c58bf06831a79aaf4efed44c0e869cfab
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Dec 6 16:03:36 2022 +0400

    IGNITE-18319 Pass sender consistendId instead of ClusterNode to handler (#1406)
---
 .../management/ClusterManagementGroupManager.java  | 14 +++++-----
 .../internal/compute/ComputeComponentImpl.java     | 16 +++++-------
 .../internal/compute/ComputeComponentImplTest.java | 11 ++++----
 .../java/org/apache/ignite/lang/ErrorGroups.java   |  9 +++++++
 .../apache/ignite/network/MessagingService.java    | 14 ++++++++++
 .../ignite/network/NetworkMessageHandler.java      |  4 +--
 ...java => UnresolvableConsistentIdException.java} | 20 ++++++---------
 .../scalecube/ItScaleCubeNetworkMessagingTest.java |  8 +++---
 .../ignite/network/DefaultMessagingService.java    | 30 ++++++++++++++--------
 .../network/DefaultMessagingServiceTest.java       | 25 +++++++++++++-----
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       | 10 ++++++--
 .../ignite/internal/replicator/ReplicaManager.java | 21 ++++++++-------
 .../sql/engine/message/MessageServiceImpl.java     |  4 +--
 .../outgoing/OutgoingSnapshotsManager.java         |  9 +++----
 14 files changed, 119 insertions(+), 76 deletions(-)

diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index c9d90d81a2..3fc97d55fd 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -180,17 +180,17 @@ public class ClusterManagementGroupManager implements IgniteComponent {
 
         clusterService.messagingService().addMessageHandler(
                 CmgMessageGroup.class,
-                messageHandlerFactory.wrapHandler((message, sender, correlationId) -> {
+                messageHandlerFactory.wrapHandler((message, senderConsistentId, correlationId) -> {
                     if (message instanceof ClusterStateMessage) {
                         assert correlationId != null;
 
-                        handleClusterState((ClusterStateMessage) message, sender, correlationId);
+                        handleClusterState((ClusterStateMessage) message, senderConsistentId, correlationId);
                     } else if (message instanceof CancelInitMessage) {
                         handleCancelInit((CancelInitMessage) message);
                     } else if (message instanceof CmgInitMessage) {
                         assert correlationId != null;
 
-                        handleInit((CmgInitMessage) message, sender, correlationId);
+                        handleInit((CmgInitMessage) message, senderConsistentId, correlationId);
                     }
                 })
         );
@@ -250,7 +250,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
      *     we simply check that the Raft state and the received message are the same.</li>
      * </ol>
      */
-    private void handleInit(CmgInitMessage msg, ClusterNode sender, long correlationId) {
+    private void handleInit(CmgInitMessage msg, String senderConsistentId, long correlationId) {
         synchronized (raftServiceLock) {
             CompletableFuture<CmgRaftService> serviceFuture = raftService;
 
@@ -289,7 +289,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
                                             .build();
                                 }
 
-                                clusterService.messagingService().respond(sender, response, correlationId);
+                                clusterService.messagingService().respond(senderConsistentId, response, correlationId);
 
                                 return service;
                             }));
@@ -402,8 +402,8 @@ public class ClusterManagementGroupManager implements IgniteComponent {
     /**
      * Handler for the {@link ClusterStateMessage}.
      */
-    private void handleClusterState(ClusterStateMessage msg, ClusterNode sender, long correlationId) {
-        clusterService.messagingService().respond(sender, msgFactory.successResponseMessage().build(), correlationId);
+    private void handleClusterState(ClusterStateMessage msg, String senderConsistentId, long correlationId) {
+        clusterService.messagingService().respond(senderConsistentId, msgFactory.successResponseMessage().build(), correlationId);
 
         ClusterState state = msg.clusterState();
 
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 179d58fea6..5fae438e23 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -177,7 +177,6 @@ public class ComputeComponentImpl implements ComputeComponent {
         return future;
     }
 
-    @SuppressWarnings("unchecked")
     private <R> CompletableFuture<R> resultFromExecuteResponse(ExecuteResponse executeResponse) {
         if (executeResponse.throwable() != null) {
             return CompletableFuture.failedFuture(executeResponse.throwable());
@@ -198,11 +197,11 @@ public class ComputeComponentImpl implements ComputeComponent {
                 new NamedThreadFactory(NamedThreadFactory.threadPrefix(ignite.name(), "compute"), LOG)
         );
 
-        messagingService.addMessageHandler(ComputeMessageTypes.class, (message, sender, correlationId) -> {
+        messagingService.addMessageHandler(ComputeMessageTypes.class, (message, senderConsistentId, correlationId) -> {
             assert correlationId != null;
 
             if (message instanceof ExecuteRequest) {
-                processExecuteRequest((ExecuteRequest) message, sender, correlationId);
+                processExecuteRequest((ExecuteRequest) message, senderConsistentId, correlationId);
 
                 return;
             }
@@ -215,9 +214,9 @@ public class ComputeComponentImpl implements ComputeComponent {
         return new LinkedBlockingQueue<>();
     }
 
-    private void processExecuteRequest(ExecuteRequest executeRequest, ClusterNode sender, long correlationId) {
+    private void processExecuteRequest(ExecuteRequest executeRequest, String senderConsistentId, long correlationId) {
         if (!busyLock.enterBusy()) {
-            sendExecuteResponse(null, new NodeStoppingException(), sender, correlationId);
+            sendExecuteResponse(null, new NodeStoppingException(), senderConsistentId, correlationId);
             return;
         }
 
@@ -225,25 +224,24 @@ public class ComputeComponentImpl implements ComputeComponent {
             Class<ComputeJob<Object>> jobClass = jobClass(executeRequest.jobClassName());
 
             doExecuteLocally(jobClass, executeRequest.args())
-                    .handle((result, ex) -> sendExecuteResponse(result, ex, sender, correlationId));
+                    .handle((result, ex) -> sendExecuteResponse(result, ex, senderConsistentId, correlationId));
         } finally {
             busyLock.leaveBusy();
         }
     }
 
     @Nullable
-    private Object sendExecuteResponse(Object result, Throwable ex, ClusterNode sender, Long correlationId) {
+    private Object sendExecuteResponse(@Nullable Object result, @Nullable Throwable ex, String senderConsistentId, Long correlationId) {
         ExecuteResponse executeResponse = messagesFactory.executeResponse()
                 .result(result)
                 .throwable(ex)
                 .build();
 
-        messagingService.respond(sender, executeResponse, correlationId);
+        messagingService.respond(senderConsistentId, executeResponse, correlationId);
 
         return null;
     }
 
-    @SuppressWarnings("unchecked")
     private <R, J extends ComputeJob<R>> Class<J> jobClass(String jobClassName) {
         try {
             return (Class<J>) Class.forName(jobClassName, true, jobClassLoader);
diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index e30fb76273..bfd7605a65 100644
--- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -30,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.lenient;
@@ -220,7 +221,7 @@ class ComputeComponentImplTest {
         markResponseSentOnResponseSend();
         assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
 
-        var sender = new ClusterNode("test", "test", new NetworkAddress("some-host", 1));
+        String sender = "test";
 
         ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
                 .jobClassName(SimpleJob.class.getName())
@@ -232,14 +233,14 @@ class ComputeComponentImplTest {
     }
 
     private void markResponseSentOnResponseSend() {
-        when(messagingService.respond(any(), any(), anyLong()))
+        when(messagingService.respond(anyString(), any(), anyLong()))
                 .thenAnswer(invocation -> {
                     responseSent.set(true);
                     return null;
                 });
     }
 
-    private void assertThatExecuteResponseIsSentTo(ClusterNode sender) throws InterruptedException {
+    private void assertThatExecuteResponseIsSentTo(String sender) throws InterruptedException {
         assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent");
 
         verify(messagingService).respond(eq(sender), executeResponseCaptor.capture(), eq(123L));
@@ -294,7 +295,7 @@ class ComputeComponentImplTest {
         markResponseSentOnResponseSend();
         assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
 
-        var sender = new ClusterNode("test", "test", new NetworkAddress("some-host", 1));
+        String sender = "test";
 
         ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
                 .jobClassName(SimpleJob.class.getName())
@@ -305,7 +306,7 @@ class ComputeComponentImplTest {
         assertThatNodeStoppingExceptionIsSentTo(sender);
     }
 
-    private void assertThatNodeStoppingExceptionIsSentTo(ClusterNode sender) throws InterruptedException {
+    private void assertThatNodeStoppingExceptionIsSentTo(String sender) throws InterruptedException {
         assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent");
 
         verify(messagingService).respond(eq(sender), executeResponseCaptor.capture(), eq(123L));
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 4b215db2a8..1a11e32f98 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -332,4 +332,13 @@ public class ErrorGroups {
         /** Distribution zone rename error. */
         public static final int ZONE_RENAME_ERR = DISTRIBUTION_ZONES_ERR_GROUP.registerErrorCode(3);
     }
+
+    /** Network error group. */
+    public static class Network {
+        /** Network error group. */
+        public static final ErrorGroup NETWORK_ERR_GROUP = ErrorGroup.newGroup("NETWORK", 11);
+
+        /** Unresolvable consistent ID. */
+        public static final int UNRESOLVABLE_CONSISTENT_ID_ERR = NETWORK_ERR_GROUP.registerErrorCode(1);
+    }
 }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index bc284f1ef6..614687077f 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -65,6 +65,20 @@ public interface MessagingService {
      */
     CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId);
 
+    /**
+     * Sends a response to a {@link #invoke} request.
+     * Guarantees are the same as for the {@link #send(ClusterNode, NetworkMessage)}.
+     *
+     * <p>If the recipient cannot be resolved (because it has already left the physical topology), the returned future is resolved
+     * with the corresponding exception ({@link UnresolvableConsistentIdException}).
+     *
+     * @param recipientConsistentId Consistent ID of the recipient of the message.
+     * @param msg Message which should be delivered.
+     * @param correlationId Correlation id when replying to the request.
+     * @return Future of the send operation.
+     */
+    CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId);
+
     /**
      * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
      * completed successfully upon receiving a response.
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
index dffcfd8070..4468b57c7e 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
@@ -27,9 +27,9 @@ public interface NetworkMessageHandler {
      * Method that gets invoked when a network message is received.
      *
      * @param message Message, which was received from the cluster.
-     * @param sender Sender node.
+     * @param senderConsistentId Consistend ID of the sender node.
      * @param correlationId Correlation id. Used to track correspondence between requests and responses. Can be {@code null} if the received
      *     message is not a request from a {@link MessagingService#invoke} method from another node.
      */
-    void onReceived(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId);
+    void onReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId);
 }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network-api/src/main/java/org/apache/ignite/network/UnresolvableConsistentIdException.java
similarity index 56%
copy from modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
copy to modules/network-api/src/main/java/org/apache/ignite/network/UnresolvableConsistentIdException.java
index dffcfd8070..ba5e28fe21 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/UnresolvableConsistentIdException.java
@@ -17,19 +17,15 @@
 
 package org.apache.ignite.network;
 
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.lang.ErrorGroups.Network;
+import org.apache.ignite.lang.IgniteException;
 
 /**
- * Handler of incoming messages.
+ * Thrown when consistent ID cannot be resolved to a {@link ClusterNode} instance (i.e. when
+ * there is no node with such consistent ID in the physical topology).
  */
-public interface NetworkMessageHandler {
-    /**
-     * Method that gets invoked when a network message is received.
-     *
-     * @param message Message, which was received from the cluster.
-     * @param sender Sender node.
-     * @param correlationId Correlation id. Used to track correspondence between requests and responses. Can be {@code null} if the received
-     *     message is not a request from a {@link MessagingService#invoke} method from another node.
-     */
-    void onReceived(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId);
+public class UnresolvableConsistentIdException extends IgniteException {
+    public UnresolvableConsistentIdException(String msg) {
+        super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg);
+    }
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 554d5eb7b9..be52773cf1 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -161,13 +161,13 @@ class ItScaleCubeNetworkMessagingTest {
         class Data {
             private final TestMessage message;
 
-            private final ClusterNode sender;
+            private final String senderConsistentId;
 
             private final Long correlationId;
 
-            private Data(TestMessage message, ClusterNode sender, Long correlationId) {
+            private Data(TestMessage message, String senderConsistentId, Long correlationId) {
                 this.message = message;
-                this.sender = sender;
+                this.senderConsistentId = senderConsistentId;
                 this.correlationId = correlationId;
             }
         }
@@ -187,7 +187,7 @@ class ItScaleCubeNetworkMessagingTest {
         Data actualData = dataFuture.get(3, TimeUnit.SECONDS);
 
         assertThat(actualData.message.msg(), is(requestMessage.msg()));
-        assertThat(actualData.sender.name(), is(self.name()));
+        assertThat(actualData.senderConsistentId, is(self.name()));
         assertNull(actualData.correlationId);
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 1e9eec1f6c..e86a48a59e 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -115,25 +115,34 @@ public class DefaultMessagingService extends AbstractMessagingService {
         connectionManager.addListener(this::onMessage);
     }
 
-    /** {@inheritDoc} */
     @Override
     public void weakSend(ClusterNode recipient, NetworkMessage msg) {
         send(recipient, msg);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
         return send0(recipient, msg, null);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
         return send0(recipient, msg, correlationId);
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByConsistentId(recipientConsistentId);
+
+        if (recipient == null) {
+            return failedFuture(
+                    new UnresolvableConsistentIdException("Recipient consistent ID cannot be resolved: " + recipientConsistentId)
+            );
+        }
+
+        return respond(recipient, msg, correlationId);
+    }
+
     @Override
     public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
         return invoke0(recipient, msg, timeout);
@@ -245,7 +254,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
      */
     private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
         for (NetworkMessageHandler networkMessageHandler : getMessageHandlers(msg.groupType())) {
-            networkMessageHandler.onReceived(msg, topologyService.localMember(), correlationId);
+            networkMessageHandler.onReceived(msg, topologyService.localMember().name(), correlationId);
         }
     }
 
@@ -263,7 +272,6 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         NetworkMessage msg = obj.message();
         DescriptorRegistry registry = obj.registry();
-        String consistentId = obj.consistentId();
         try {
             msg.unmarshal(marshaller, registry);
         } catch (Exception e) {
@@ -285,15 +293,15 @@ public class DefaultMessagingService extends AbstractMessagingService {
             message = messageWithCorrelation.message();
         }
 
-        ClusterNode sender = topologyService.getByConsistentId(consistentId);
+        String senderConsistentId = obj.consistentId();
 
         // Unfortunately, since the Messaging Service is used by ScaleCube itself, some messages can be sent
-        // before the node is added to the topology. ScaleCubeMessage handler guarantees to handle null sender without throwing an
-        // exception.
-        assert message instanceof ScaleCubeMessage || sender != null : consistentId;
+        // before the node is added to the topology. ScaleCubeMessage handler guarantees to handle null sender consistent ID
+        // without throwing an exception.
+        assert message instanceof ScaleCubeMessage || senderConsistentId != null;
 
         for (NetworkMessageHandler networkMessageHandler : getMessageHandlers(message.groupType())) {
-            networkMessageHandler.onReceived(message, sender, correlationId);
+            networkMessageHandler.onReceived(message, senderConsistentId, correlationId);
         }
     }
 
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index 20fad46087..e56d7c252c 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -17,14 +17,18 @@
 
 package org.apache.ignite.network;
 
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -101,7 +105,7 @@ class DefaultMessagingServiceTest {
         configureSender();
         configureReceiver();
 
-        when(topologyService.getByConsistentId(eq(senderNode.name()))).thenReturn(senderNode);
+        lenient().when(topologyService.getByConsistentId(eq(senderNode.name()))).thenReturn(senderNode);
 
         messageSerializationRegistry.registerFactory(
                 (short) 2,
@@ -140,13 +144,22 @@ class DefaultMessagingServiceTest {
         }
     }
 
+    @Test
+    void respondingWhenSenderIsNotInTopologyResultsInFailingFuture() throws Exception {
+        try (Services services = createMessagingService(senderNode, senderNetworkConfig, () -> {})) {
+            CompletableFuture<Void> resultFuture = services.messagingService.respond("no-such-node", mock(NetworkMessage.class), 123);
+
+            assertThat(resultFuture, willThrow(UnresolvableConsistentIdException.class));
+        }
+    }
+
     private void configureSender() {
         when(senderNetworkConfigView.port()).thenReturn(SENDER_PORT);
         configureNetworkDefaults(senderNetworkConfig, senderNetworkConfigView, senderOutboundConfig, senderInboundConfig);
     }
 
     private void configureReceiver() {
-        when(receiverNetworkConfigView.port()).thenReturn(RECEIVER_PORT);
+        lenient().when(receiverNetworkConfigView.port()).thenReturn(RECEIVER_PORT);
         configureNetworkDefaults(receiverNetworkConfig, receiverNetworkConfigView, receiverOutboundConfig, receiverInboundConfig);
     }
 
@@ -156,10 +169,10 @@ class DefaultMessagingServiceTest {
             OutboundView outboundConfig,
             InboundView inboundConfig
     ) {
-        when(networkConfig.value()).thenReturn(networkConfigView);
-        when(networkConfigView.portRange()).thenReturn(0);
-        when(networkConfigView.outbound()).thenReturn(outboundConfig);
-        when(networkConfigView.inbound()).thenReturn(inboundConfig);
+        lenient().when(networkConfig.value()).thenReturn(networkConfigView);
+        lenient().when(networkConfigView.portRange()).thenReturn(0);
+        lenient().when(networkConfigView.outbound()).thenReturn(outboundConfig);
+        lenient().when(networkConfigView.inbound()).thenReturn(inboundConfig);
     }
 
     private static void awaitQuietly(CountDownLatch latch) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 6c6c30998d..3326f533c5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.UnresolvableConsistentIdException;import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NetworkMessageHandler;
 import org.apache.ignite.network.TopologyEventHandler;
@@ -140,10 +140,16 @@ public class IgniteRpcServer implements RpcServer<Void> {
      */
     public class RpcMessageHandler implements NetworkMessageHandler {
         /** {@inheritDoc} */
-        @Override public void onReceived(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId) {
+        @Override public void onReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId) {
             Class<? extends NetworkMessage> cls = message.getClass();
             RpcProcessor<NetworkMessage> prc = processors.get(cls.getName());
 
+            ClusterNode sender = clusterService().topologyService().getByConsistentId(senderConsistentId);
+
+            if (sender == null) {
+                throw new UnresolvableConsistentIdException("No node by consistent ID " + senderConsistentId);
+            }
+
             // TODO asch cache mapping https://issues.apache.org/jira/browse/IGNITE-14832
             if (prc == null) {
                 for (Class<?> iface : cls.getInterfaces()) {
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 23c4549741..72f4cebabe 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NetworkMessageHandler;
@@ -107,7 +106,7 @@ public class ReplicaManager implements IgniteComponent {
         this.clusterNetSvc = clusterNetSvc;
         this.clock = clock;
         this.messageGroupsToHandle = messageGroupsToHandle;
-        this.handler = (message, sender, correlationId) -> {
+        this.handler = (message, senderConsistentId, correlationId) -> {
             if (!busyLock.enterBusy()) {
                 throw new IgniteException(new NodeStoppingException());
             }
@@ -131,7 +130,7 @@ public class ReplicaManager implements IgniteComponent {
                                     ignore -> {
                                         IgniteUtils.inBusyLock(
                                                 busyLock,
-                                                () -> sendAwaitReplicaResponse(sender, correlationId)
+                                                () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
                                         );
 
                                         return null;
@@ -140,7 +139,7 @@ public class ReplicaManager implements IgniteComponent {
 
                             return replicaFut;
                         } else {
-                            IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(sender, correlationId));
+                            IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(senderConsistentId, correlationId));
 
                             return replicaFut;
                         }
@@ -154,7 +153,7 @@ public class ReplicaManager implements IgniteComponent {
                 HybridTimestamp requestTimestamp = extractTimestamp(request);
 
                 if (replicaFut == null || !replicaFut.isDone()) {
-                    sendReplicaUnavailableErrorResponse(sender, correlationId, request, requestTimestamp);
+                    sendReplicaUnavailableErrorResponse(senderConsistentId, correlationId, request, requestTimestamp);
 
                     return;
                 }
@@ -173,7 +172,7 @@ public class ReplicaManager implements IgniteComponent {
                         msg = prepareReplicaErrorResponse(requestTimestamp, ex);
                     }
 
-                    clusterNetSvc.messagingService().respond(sender, msg, correlationId);
+                    clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId);
 
                     return null;
                 });
@@ -305,14 +304,14 @@ public class ReplicaManager implements IgniteComponent {
      * Sends replica unavailable error response.
      */
     private void sendReplicaUnavailableErrorResponse(
-            ClusterNode sender,
+            String senderConsistentId,
             @Nullable Long correlationId,
             ReplicaRequest request,
             HybridTimestamp requestTimestamp
     ) {
         if (requestTimestamp != null) {
             clusterNetSvc.messagingService().respond(
-                    sender,
+                    senderConsistentId,
                     REPLICA_MESSAGES_FACTORY
                             .errorTimestampAwareReplicaResponse()
                             .throwable(
@@ -325,7 +324,7 @@ public class ReplicaManager implements IgniteComponent {
                     correlationId);
         } else {
             clusterNetSvc.messagingService().respond(
-                    sender,
+                    senderConsistentId,
                     REPLICA_MESSAGES_FACTORY
                             .errorReplicaResponse()
                             .throwable(
@@ -341,9 +340,9 @@ public class ReplicaManager implements IgniteComponent {
     /**
      * Sends await replica response.
      */
-    private void sendAwaitReplicaResponse(ClusterNode sender, @Nullable Long correlationId) {
+    private void sendAwaitReplicaResponse(String senderConsistentId, @Nullable Long correlationId) {
         clusterNetSvc.messagingService().respond(
-                sender,
+                senderConsistentId,
                 REPLICA_MESSAGES_FACTORY
                         .awaitReplicaResponse()
                         .build(),
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
index 425a5c5c68..a0010e81b9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
@@ -136,7 +136,7 @@ public class MessageServiceImpl implements MessageService {
         }
     }
 
-    private void onMessage(NetworkMessage msg, ClusterNode sender, @Nullable Long correlationId) {
+    private void onMessage(NetworkMessage msg, String senderConsistentId, @Nullable Long correlationId) {
         if (!busyLock.enterBusy()) {
             return;
         }
@@ -144,7 +144,7 @@ public class MessageServiceImpl implements MessageService {
         try {
             assert msg.groupType() == GROUP_TYPE : "unexpected message group grpType=" + msg.groupType();
 
-            onMessage(sender.name(), msg);
+            onMessage(senderConsistentId, msg);
         } finally {
             busyLock.leaveBusy();
         }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
index fa0a798663..3a7d46f5e7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
@@ -141,7 +140,7 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp
         }
     }
 
-    private void handleMessage(NetworkMessage networkMessage, ClusterNode sender, @Nullable Long correlationId) {
+    private void handleMessage(NetworkMessage networkMessage, String senderConsistentId, @Nullable Long correlationId) {
         // Ignore all messages that we can't handle.
         if (!(networkMessage instanceof SnapshotRequestMessage)) {
             return;
@@ -163,7 +162,7 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp
                 .supplyAsync(() -> handleSnapshotRequestMessage(networkMessage, outgoingSnapshot), executor)
                 .whenCompleteAsync((response, throwable) -> {
                     if (response != null) {
-                        respond(response, throwable, sender, correlationId);
+                        respond(response, throwable, senderConsistentId, correlationId);
                     }
                 }, executor);
     }
@@ -187,7 +186,7 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp
     private void respond(
             NetworkMessage response,
             Throwable throwable,
-            ClusterNode sender,
+            String senderConsistentId,
             Long correlationId
     ) {
         if (throwable != null) {
@@ -196,7 +195,7 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp
         }
 
         try {
-            messagingService.respond(sender, response, correlationId);
+            messagingService.respond(senderConsistentId, response, correlationId);
         } catch (RuntimeException e) {
             LOG.warn("Could not send a response with correlationId=" + correlationId, e);
         }