You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/12/06 12:03:41 UTC
[ignite-3] branch main updated: IGNITE-18319 Pass sender consistendId instead of ClusterNode to handler (#1406)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9548206c58 IGNITE-18319 Pass sender consistendId instead of ClusterNode to handler (#1406)
9548206c58 is described below
commit 9548206c58bf06831a79aaf4efed44c0e869cfab
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Dec 6 16:03:36 2022 +0400
IGNITE-18319 Pass sender consistendId instead of ClusterNode to handler (#1406)
---
.../management/ClusterManagementGroupManager.java | 14 +++++-----
.../internal/compute/ComputeComponentImpl.java | 16 +++++-------
.../internal/compute/ComputeComponentImplTest.java | 11 ++++----
.../java/org/apache/ignite/lang/ErrorGroups.java | 9 +++++++
.../apache/ignite/network/MessagingService.java | 14 ++++++++++
.../ignite/network/NetworkMessageHandler.java | 4 +--
...java => UnresolvableConsistentIdException.java} | 20 ++++++---------
.../scalecube/ItScaleCubeNetworkMessagingTest.java | 8 +++---
.../ignite/network/DefaultMessagingService.java | 30 ++++++++++++++--------
.../network/DefaultMessagingServiceTest.java | 25 +++++++++++++-----
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 10 ++++++--
.../ignite/internal/replicator/ReplicaManager.java | 21 ++++++++-------
.../sql/engine/message/MessageServiceImpl.java | 4 +--
.../outgoing/OutgoingSnapshotsManager.java | 9 +++----
14 files changed, 119 insertions(+), 76 deletions(-)
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index c9d90d81a2..3fc97d55fd 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -180,17 +180,17 @@ public class ClusterManagementGroupManager implements IgniteComponent {
clusterService.messagingService().addMessageHandler(
CmgMessageGroup.class,
- messageHandlerFactory.wrapHandler((message, sender, correlationId) -> {
+ messageHandlerFactory.wrapHandler((message, senderConsistentId, correlationId) -> {
if (message instanceof ClusterStateMessage) {
assert correlationId != null;
- handleClusterState((ClusterStateMessage) message, sender, correlationId);
+ handleClusterState((ClusterStateMessage) message, senderConsistentId, correlationId);
} else if (message instanceof CancelInitMessage) {
handleCancelInit((CancelInitMessage) message);
} else if (message instanceof CmgInitMessage) {
assert correlationId != null;
- handleInit((CmgInitMessage) message, sender, correlationId);
+ handleInit((CmgInitMessage) message, senderConsistentId, correlationId);
}
})
);
@@ -250,7 +250,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
* we simply check that the Raft state and the received message are the same.</li>
* </ol>
*/
- private void handleInit(CmgInitMessage msg, ClusterNode sender, long correlationId) {
+ private void handleInit(CmgInitMessage msg, String senderConsistentId, long correlationId) {
synchronized (raftServiceLock) {
CompletableFuture<CmgRaftService> serviceFuture = raftService;
@@ -289,7 +289,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
.build();
}
- clusterService.messagingService().respond(sender, response, correlationId);
+ clusterService.messagingService().respond(senderConsistentId, response, correlationId);
return service;
}));
@@ -402,8 +402,8 @@ public class ClusterManagementGroupManager implements IgniteComponent {
/**
* Handler for the {@link ClusterStateMessage}.
*/
- private void handleClusterState(ClusterStateMessage msg, ClusterNode sender, long correlationId) {
- clusterService.messagingService().respond(sender, msgFactory.successResponseMessage().build(), correlationId);
+ private void handleClusterState(ClusterStateMessage msg, String senderConsistentId, long correlationId) {
+ clusterService.messagingService().respond(senderConsistentId, msgFactory.successResponseMessage().build(), correlationId);
ClusterState state = msg.clusterState();
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 179d58fea6..5fae438e23 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -177,7 +177,6 @@ public class ComputeComponentImpl implements ComputeComponent {
return future;
}
- @SuppressWarnings("unchecked")
private <R> CompletableFuture<R> resultFromExecuteResponse(ExecuteResponse executeResponse) {
if (executeResponse.throwable() != null) {
return CompletableFuture.failedFuture(executeResponse.throwable());
@@ -198,11 +197,11 @@ public class ComputeComponentImpl implements ComputeComponent {
new NamedThreadFactory(NamedThreadFactory.threadPrefix(ignite.name(), "compute"), LOG)
);
- messagingService.addMessageHandler(ComputeMessageTypes.class, (message, sender, correlationId) -> {
+ messagingService.addMessageHandler(ComputeMessageTypes.class, (message, senderConsistentId, correlationId) -> {
assert correlationId != null;
if (message instanceof ExecuteRequest) {
- processExecuteRequest((ExecuteRequest) message, sender, correlationId);
+ processExecuteRequest((ExecuteRequest) message, senderConsistentId, correlationId);
return;
}
@@ -215,9 +214,9 @@ public class ComputeComponentImpl implements ComputeComponent {
return new LinkedBlockingQueue<>();
}
- private void processExecuteRequest(ExecuteRequest executeRequest, ClusterNode sender, long correlationId) {
+ private void processExecuteRequest(ExecuteRequest executeRequest, String senderConsistentId, long correlationId) {
if (!busyLock.enterBusy()) {
- sendExecuteResponse(null, new NodeStoppingException(), sender, correlationId);
+ sendExecuteResponse(null, new NodeStoppingException(), senderConsistentId, correlationId);
return;
}
@@ -225,25 +224,24 @@ public class ComputeComponentImpl implements ComputeComponent {
Class<ComputeJob<Object>> jobClass = jobClass(executeRequest.jobClassName());
doExecuteLocally(jobClass, executeRequest.args())
- .handle((result, ex) -> sendExecuteResponse(result, ex, sender, correlationId));
+ .handle((result, ex) -> sendExecuteResponse(result, ex, senderConsistentId, correlationId));
} finally {
busyLock.leaveBusy();
}
}
@Nullable
- private Object sendExecuteResponse(Object result, Throwable ex, ClusterNode sender, Long correlationId) {
+ private Object sendExecuteResponse(@Nullable Object result, @Nullable Throwable ex, String senderConsistentId, Long correlationId) {
ExecuteResponse executeResponse = messagesFactory.executeResponse()
.result(result)
.throwable(ex)
.build();
- messagingService.respond(sender, executeResponse, correlationId);
+ messagingService.respond(senderConsistentId, executeResponse, correlationId);
return null;
}
- @SuppressWarnings("unchecked")
private <R, J extends ComputeJob<R>> Class<J> jobClass(String jobClassName) {
try {
return (Class<J>) Class.forName(jobClassName, true, jobClassLoader);
diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index e30fb76273..bfd7605a65 100644
--- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -30,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
@@ -220,7 +221,7 @@ class ComputeComponentImplTest {
markResponseSentOnResponseSend();
assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
- var sender = new ClusterNode("test", "test", new NetworkAddress("some-host", 1));
+ String sender = "test";
ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
.jobClassName(SimpleJob.class.getName())
@@ -232,14 +233,14 @@ class ComputeComponentImplTest {
}
private void markResponseSentOnResponseSend() {
- when(messagingService.respond(any(), any(), anyLong()))
+ when(messagingService.respond(anyString(), any(), anyLong()))
.thenAnswer(invocation -> {
responseSent.set(true);
return null;
});
}
- private void assertThatExecuteResponseIsSentTo(ClusterNode sender) throws InterruptedException {
+ private void assertThatExecuteResponseIsSentTo(String sender) throws InterruptedException {
assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent");
verify(messagingService).respond(eq(sender), executeResponseCaptor.capture(), eq(123L));
@@ -294,7 +295,7 @@ class ComputeComponentImplTest {
markResponseSentOnResponseSend();
assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
- var sender = new ClusterNode("test", "test", new NetworkAddress("some-host", 1));
+ String sender = "test";
ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
.jobClassName(SimpleJob.class.getName())
@@ -305,7 +306,7 @@ class ComputeComponentImplTest {
assertThatNodeStoppingExceptionIsSentTo(sender);
}
- private void assertThatNodeStoppingExceptionIsSentTo(ClusterNode sender) throws InterruptedException {
+ private void assertThatNodeStoppingExceptionIsSentTo(String sender) throws InterruptedException {
assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent");
verify(messagingService).respond(eq(sender), executeResponseCaptor.capture(), eq(123L));
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 4b215db2a8..1a11e32f98 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -332,4 +332,13 @@ public class ErrorGroups {
/** Distribution zone rename error. */
public static final int ZONE_RENAME_ERR = DISTRIBUTION_ZONES_ERR_GROUP.registerErrorCode(3);
}
+
+ /** Network error group. */
+ public static class Network {
+ /** Network error group. */
+ public static final ErrorGroup NETWORK_ERR_GROUP = ErrorGroup.newGroup("NETWORK", 11);
+
+ /** Unresolvable consistent ID. */
+ public static final int UNRESOLVABLE_CONSISTENT_ID_ERR = NETWORK_ERR_GROUP.registerErrorCode(1);
+ }
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index bc284f1ef6..614687077f 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -65,6 +65,20 @@ public interface MessagingService {
*/
CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId);
+ /**
+ * Sends a response to a {@link #invoke} request.
+ * Guarantees are the same as for the {@link #send(ClusterNode, NetworkMessage)}.
+ *
+ * <p>If the recipient cannot be resolved (because it has already left the physical topology), the returned future is resolved
+ * with the corresponding exception ({@link UnresolvableConsistentIdException}).
+ *
+ * @param recipientConsistentId Consistent ID of the recipient of the message.
+ * @param msg Message which should be delivered.
+ * @param correlationId Correlation id when replying to the request.
+ * @return Future of the send operation.
+ */
+ CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId);
+
/**
* Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
* completed successfully upon receiving a response.
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
index dffcfd8070..4468b57c7e 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
@@ -27,9 +27,9 @@ public interface NetworkMessageHandler {
* Method that gets invoked when a network message is received.
*
* @param message Message, which was received from the cluster.
- * @param sender Sender node.
+ * @param senderConsistentId Consistend ID of the sender node.
* @param correlationId Correlation id. Used to track correspondence between requests and responses. Can be {@code null} if the received
* message is not a request from a {@link MessagingService#invoke} method from another node.
*/
- void onReceived(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId);
+ void onReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId);
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network-api/src/main/java/org/apache/ignite/network/UnresolvableConsistentIdException.java
similarity index 56%
copy from modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
copy to modules/network-api/src/main/java/org/apache/ignite/network/UnresolvableConsistentIdException.java
index dffcfd8070..ba5e28fe21 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/UnresolvableConsistentIdException.java
@@ -17,19 +17,15 @@
package org.apache.ignite.network;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.lang.ErrorGroups.Network;
+import org.apache.ignite.lang.IgniteException;
/**
- * Handler of incoming messages.
+ * Thrown when consistent ID cannot be resolved to a {@link ClusterNode} instance (i.e. when
+ * there is no node with such consistent ID in the physical topology).
*/
-public interface NetworkMessageHandler {
- /**
- * Method that gets invoked when a network message is received.
- *
- * @param message Message, which was received from the cluster.
- * @param sender Sender node.
- * @param correlationId Correlation id. Used to track correspondence between requests and responses. Can be {@code null} if the received
- * message is not a request from a {@link MessagingService#invoke} method from another node.
- */
- void onReceived(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId);
+public class UnresolvableConsistentIdException extends IgniteException {
+ public UnresolvableConsistentIdException(String msg) {
+ super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg);
+ }
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 554d5eb7b9..be52773cf1 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -161,13 +161,13 @@ class ItScaleCubeNetworkMessagingTest {
class Data {
private final TestMessage message;
- private final ClusterNode sender;
+ private final String senderConsistentId;
private final Long correlationId;
- private Data(TestMessage message, ClusterNode sender, Long correlationId) {
+ private Data(TestMessage message, String senderConsistentId, Long correlationId) {
this.message = message;
- this.sender = sender;
+ this.senderConsistentId = senderConsistentId;
this.correlationId = correlationId;
}
}
@@ -187,7 +187,7 @@ class ItScaleCubeNetworkMessagingTest {
Data actualData = dataFuture.get(3, TimeUnit.SECONDS);
assertThat(actualData.message.msg(), is(requestMessage.msg()));
- assertThat(actualData.sender.name(), is(self.name()));
+ assertThat(actualData.senderConsistentId, is(self.name()));
assertNull(actualData.correlationId);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 1e9eec1f6c..e86a48a59e 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -115,25 +115,34 @@ public class DefaultMessagingService extends AbstractMessagingService {
connectionManager.addListener(this::onMessage);
}
- /** {@inheritDoc} */
@Override
public void weakSend(ClusterNode recipient, NetworkMessage msg) {
send(recipient, msg);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
return send0(recipient, msg, null);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
return send0(recipient, msg, correlationId);
}
- /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+ ClusterNode recipient = topologyService.getByConsistentId(recipientConsistentId);
+
+ if (recipient == null) {
+ return failedFuture(
+ new UnresolvableConsistentIdException("Recipient consistent ID cannot be resolved: " + recipientConsistentId)
+ );
+ }
+
+ return respond(recipient, msg, correlationId);
+ }
+
@Override
public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
return invoke0(recipient, msg, timeout);
@@ -245,7 +254,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
*/
private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
for (NetworkMessageHandler networkMessageHandler : getMessageHandlers(msg.groupType())) {
- networkMessageHandler.onReceived(msg, topologyService.localMember(), correlationId);
+ networkMessageHandler.onReceived(msg, topologyService.localMember().name(), correlationId);
}
}
@@ -263,7 +272,6 @@ public class DefaultMessagingService extends AbstractMessagingService {
NetworkMessage msg = obj.message();
DescriptorRegistry registry = obj.registry();
- String consistentId = obj.consistentId();
try {
msg.unmarshal(marshaller, registry);
} catch (Exception e) {
@@ -285,15 +293,15 @@ public class DefaultMessagingService extends AbstractMessagingService {
message = messageWithCorrelation.message();
}
- ClusterNode sender = topologyService.getByConsistentId(consistentId);
+ String senderConsistentId = obj.consistentId();
// Unfortunately, since the Messaging Service is used by ScaleCube itself, some messages can be sent
- // before the node is added to the topology. ScaleCubeMessage handler guarantees to handle null sender without throwing an
- // exception.
- assert message instanceof ScaleCubeMessage || sender != null : consistentId;
+ // before the node is added to the topology. ScaleCubeMessage handler guarantees to handle null sender consistent ID
+ // without throwing an exception.
+ assert message instanceof ScaleCubeMessage || senderConsistentId != null;
for (NetworkMessageHandler networkMessageHandler : getMessageHandlers(message.groupType())) {
- networkMessageHandler.onReceived(message, sender, correlationId);
+ networkMessageHandler.onReceived(message, senderConsistentId, correlationId);
}
}
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index 20fad46087..e56d7c252c 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -17,14 +17,18 @@
package org.apache.ignite.network;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -101,7 +105,7 @@ class DefaultMessagingServiceTest {
configureSender();
configureReceiver();
- when(topologyService.getByConsistentId(eq(senderNode.name()))).thenReturn(senderNode);
+ lenient().when(topologyService.getByConsistentId(eq(senderNode.name()))).thenReturn(senderNode);
messageSerializationRegistry.registerFactory(
(short) 2,
@@ -140,13 +144,22 @@ class DefaultMessagingServiceTest {
}
}
+ @Test
+ void respondingWhenSenderIsNotInTopologyResultsInFailingFuture() throws Exception {
+ try (Services services = createMessagingService(senderNode, senderNetworkConfig, () -> {})) {
+ CompletableFuture<Void> resultFuture = services.messagingService.respond("no-such-node", mock(NetworkMessage.class), 123);
+
+ assertThat(resultFuture, willThrow(UnresolvableConsistentIdException.class));
+ }
+ }
+
private void configureSender() {
when(senderNetworkConfigView.port()).thenReturn(SENDER_PORT);
configureNetworkDefaults(senderNetworkConfig, senderNetworkConfigView, senderOutboundConfig, senderInboundConfig);
}
private void configureReceiver() {
- when(receiverNetworkConfigView.port()).thenReturn(RECEIVER_PORT);
+ lenient().when(receiverNetworkConfigView.port()).thenReturn(RECEIVER_PORT);
configureNetworkDefaults(receiverNetworkConfig, receiverNetworkConfigView, receiverOutboundConfig, receiverInboundConfig);
}
@@ -156,10 +169,10 @@ class DefaultMessagingServiceTest {
OutboundView outboundConfig,
InboundView inboundConfig
) {
- when(networkConfig.value()).thenReturn(networkConfigView);
- when(networkConfigView.portRange()).thenReturn(0);
- when(networkConfigView.outbound()).thenReturn(outboundConfig);
- when(networkConfigView.inbound()).thenReturn(inboundConfig);
+ lenient().when(networkConfig.value()).thenReturn(networkConfigView);
+ lenient().when(networkConfigView.portRange()).thenReturn(0);
+ lenient().when(networkConfigView.outbound()).thenReturn(outboundConfig);
+ lenient().when(networkConfigView.inbound()).thenReturn(inboundConfig);
}
private static void awaitQuietly(CountDownLatch latch) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 6c6c30998d..3326f533c5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.UnresolvableConsistentIdException;import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
import org.apache.ignite.network.TopologyEventHandler;
@@ -140,10 +140,16 @@ public class IgniteRpcServer implements RpcServer<Void> {
*/
public class RpcMessageHandler implements NetworkMessageHandler {
/** {@inheritDoc} */
- @Override public void onReceived(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId) {
+ @Override public void onReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId) {
Class<? extends NetworkMessage> cls = message.getClass();
RpcProcessor<NetworkMessage> prc = processors.get(cls.getName());
+ ClusterNode sender = clusterService().topologyService().getByConsistentId(senderConsistentId);
+
+ if (sender == null) {
+ throw new UnresolvableConsistentIdException("No node by consistent ID " + senderConsistentId);
+ }
+
// TODO asch cache mapping https://issues.apache.org/jira/browse/IGNITE-14832
if (prc == null) {
for (Class<?> iface : cls.getInterfaces()) {
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 23c4549741..72f4cebabe 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
@@ -107,7 +106,7 @@ public class ReplicaManager implements IgniteComponent {
this.clusterNetSvc = clusterNetSvc;
this.clock = clock;
this.messageGroupsToHandle = messageGroupsToHandle;
- this.handler = (message, sender, correlationId) -> {
+ this.handler = (message, senderConsistentId, correlationId) -> {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
@@ -131,7 +130,7 @@ public class ReplicaManager implements IgniteComponent {
ignore -> {
IgniteUtils.inBusyLock(
busyLock,
- () -> sendAwaitReplicaResponse(sender, correlationId)
+ () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
);
return null;
@@ -140,7 +139,7 @@ public class ReplicaManager implements IgniteComponent {
return replicaFut;
} else {
- IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(sender, correlationId));
+ IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(senderConsistentId, correlationId));
return replicaFut;
}
@@ -154,7 +153,7 @@ public class ReplicaManager implements IgniteComponent {
HybridTimestamp requestTimestamp = extractTimestamp(request);
if (replicaFut == null || !replicaFut.isDone()) {
- sendReplicaUnavailableErrorResponse(sender, correlationId, request, requestTimestamp);
+ sendReplicaUnavailableErrorResponse(senderConsistentId, correlationId, request, requestTimestamp);
return;
}
@@ -173,7 +172,7 @@ public class ReplicaManager implements IgniteComponent {
msg = prepareReplicaErrorResponse(requestTimestamp, ex);
}
- clusterNetSvc.messagingService().respond(sender, msg, correlationId);
+ clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId);
return null;
});
@@ -305,14 +304,14 @@ public class ReplicaManager implements IgniteComponent {
* Sends replica unavailable error response.
*/
private void sendReplicaUnavailableErrorResponse(
- ClusterNode sender,
+ String senderConsistentId,
@Nullable Long correlationId,
ReplicaRequest request,
HybridTimestamp requestTimestamp
) {
if (requestTimestamp != null) {
clusterNetSvc.messagingService().respond(
- sender,
+ senderConsistentId,
REPLICA_MESSAGES_FACTORY
.errorTimestampAwareReplicaResponse()
.throwable(
@@ -325,7 +324,7 @@ public class ReplicaManager implements IgniteComponent {
correlationId);
} else {
clusterNetSvc.messagingService().respond(
- sender,
+ senderConsistentId,
REPLICA_MESSAGES_FACTORY
.errorReplicaResponse()
.throwable(
@@ -341,9 +340,9 @@ public class ReplicaManager implements IgniteComponent {
/**
* Sends await replica response.
*/
- private void sendAwaitReplicaResponse(ClusterNode sender, @Nullable Long correlationId) {
+ private void sendAwaitReplicaResponse(String senderConsistentId, @Nullable Long correlationId) {
clusterNetSvc.messagingService().respond(
- sender,
+ senderConsistentId,
REPLICA_MESSAGES_FACTORY
.awaitReplicaResponse()
.build(),
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
index 425a5c5c68..a0010e81b9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
@@ -136,7 +136,7 @@ public class MessageServiceImpl implements MessageService {
}
}
- private void onMessage(NetworkMessage msg, ClusterNode sender, @Nullable Long correlationId) {
+ private void onMessage(NetworkMessage msg, String senderConsistentId, @Nullable Long correlationId) {
if (!busyLock.enterBusy()) {
return;
}
@@ -144,7 +144,7 @@ public class MessageServiceImpl implements MessageService {
try {
assert msg.groupType() == GROUP_TYPE : "unexpected message group grpType=" + msg.groupType();
- onMessage(sender.name(), msg);
+ onMessage(senderConsistentId, msg);
} finally {
busyLock.leaveBusy();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
index fa0a798663..3a7d46f5e7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
@@ -141,7 +140,7 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp
}
}
- private void handleMessage(NetworkMessage networkMessage, ClusterNode sender, @Nullable Long correlationId) {
+ private void handleMessage(NetworkMessage networkMessage, String senderConsistentId, @Nullable Long correlationId) {
// Ignore all messages that we can't handle.
if (!(networkMessage instanceof SnapshotRequestMessage)) {
return;
@@ -163,7 +162,7 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp
.supplyAsync(() -> handleSnapshotRequestMessage(networkMessage, outgoingSnapshot), executor)
.whenCompleteAsync((response, throwable) -> {
if (response != null) {
- respond(response, throwable, sender, correlationId);
+ respond(response, throwable, senderConsistentId, correlationId);
}
}, executor);
}
@@ -187,7 +186,7 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp
private void respond(
NetworkMessage response,
Throwable throwable,
- ClusterNode sender,
+ String senderConsistentId,
Long correlationId
) {
if (throwable != null) {
@@ -196,7 +195,7 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp
}
try {
- messagingService.respond(sender, response, correlationId);
+ messagingService.respond(senderConsistentId, response, correlationId);
} catch (RuntimeException e) {
LOG.warn("Could not send a response with correlationId=" + correlationId, e);
}