You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/03/22 11:57:17 UTC
[ignite-3] branch main updated: IGNITE-16948 Multichannel message transfer (#1780)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0489a9e3eb IGNITE-16948 Multichannel message transfer (#1780)
0489a9e3eb is described below
commit 0489a9e3eb9f62a66a5203301d30222b71dd0b1e
Author: Mikhail <Po...@users.noreply.github.com>
AuthorDate: Wed Mar 22 14:57:12 2023 +0300
IGNITE-16948 Multichannel message transfer (#1780)
---
.../internal/deployunit/DeploymentManagerImpl.java | 12 +-
.../org/apache/ignite/network/ChannelType.java | 127 +++++++++++++++++++
.../ignite/network/ChannelTypeAlreadyExist.java} | 26 ++--
.../apache/ignite/network/MessagingService.java | 100 +++++++++++++--
.../org/apache/ignite/network/ChannelTypeTest.java | 70 +++++++++++
modules/network/build.gradle | 1 +
.../network/netty/ItConnectionManagerTest.java | 77 ++++++------
.../internal/network/netty/ConnectionManager.java | 69 +++++------
.../internal/network/netty/ConnectorKey.java | 72 +++++++++++
.../ignite/internal/network/netty/NettySender.java | 19 ++-
.../recovery/RecoveryClientHandshakeManager.java | 14 ++-
... => RecoveryClientHandshakeManagerFactory.java} | 2 +-
.../recovery/RecoveryServerHandshakeManager.java | 19 ++-
.../ignite/network/DefaultMessagingService.java | 40 +++---
.../internal/network/netty/NettyClientTest.java | 2 +-
.../network/DefaultMessagingServiceTest.java | 136 ++++++++++++++++++++-
.../engine/framework/ClusterServiceFactory.java | 11 +-
17 files changed, 657 insertions(+), 140 deletions(-)
diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 1d51272eff..855b902844 100644
--- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ChannelType;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -83,6 +84,8 @@ public class DeploymentManagerImpl implements IgniteDeployment, IgniteComponent
private static final String TMP_SUFFIX = ".tmp";
+ private static final ChannelType DEPLOYMENT_CHANNEL = ChannelType.register((short) 1, "DeploymentUnits");
+
/**
* Meta storage.
*/
@@ -220,7 +223,8 @@ public class DeploymentManagerImpl implements IgniteDeployment, IgniteComponent
}).thenApply(logicalTopologySnapshot -> {
for (ClusterNode node : logicalTopologySnapshot.nodes()) {
clusterService.messagingService()
- .invoke(node, UndeployUnitRequestImpl.builder()
+ .invoke(node, DEPLOYMENT_CHANNEL,
+ UndeployUnitRequestImpl.builder()
.id(id)
.version(version.render())
.build(),
@@ -302,7 +306,7 @@ public class DeploymentManagerImpl implements IgniteDeployment, IgniteComponent
if (throwable != null) {
builder.error(throwable);
}
- clusterService.messagingService().respond(senderConsistentId,
+ clusterService.messagingService().respond(senderConsistentId, DEPLOYMENT_CHANNEL,
builder.build(), correlationId);
});
}
@@ -317,12 +321,12 @@ public class DeploymentManagerImpl implements IgniteDeployment, IgniteComponent
} catch (IOException e) {
LOG.error("Failed to undeploy unit " + executeRequest.id() + ":" + executeRequest.version(), e);
clusterService.messagingService()
- .respond(senderConsistentId, UndeployUnitResponseImpl.builder().error(e).build(), correlationId);
+ .respond(senderConsistentId, DEPLOYMENT_CHANNEL, UndeployUnitResponseImpl.builder().error(e).build(), correlationId);
return;
}
clusterService.messagingService()
- .respond(senderConsistentId, UndeployUnitResponseImpl.builder().build(), correlationId);
+ .respond(senderConsistentId, DEPLOYMENT_CHANNEL, UndeployUnitResponseImpl.builder().build(), correlationId);
}
private CompletableFuture<Boolean> doDeploy(DeployUnitRequest executeRequest) {
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java b/modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java
new file mode 100644
index 0000000000..5ff0cc751b
--- /dev/null
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Data class with channel information.
+ * May be used as channel pointer in {@link MessagingService} for sending messages in exclusive channel.
+ */
+public final class ChannelType {
+ static {
+ Map<Short, ChannelType> tmpChannels = new ConcurrentHashMap<>(IgniteUtils.capacity(Short.MAX_VALUE));
+ ChannelType defaultChannel = new ChannelType((short) 0, "Default");
+ tmpChannels.put((short) 0, defaultChannel);
+
+ DEFAULT = defaultChannel;
+ channels = tmpChannels;
+ }
+
+ public static final ChannelType DEFAULT;
+
+ private static final Map<Short, ChannelType> channels;
+
+ /**
+ * Channel identifier.
+ */
+ private final short id;
+
+ /**
+ * Channel name.
+ */
+ private final String name;
+
+ private ChannelType(short id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ /**
+ * Channel identifier, must be unique for each implementation.
+ *
+ * @return Channel identifier.
+ */
+ public short id() {
+ return id;
+ }
+
+ /**
+ * Returns channel name.
+ *
+ * @return Channel name.
+ */
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public int hashCode() {
+ return id();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ChannelType)) {
+ return false;
+ }
+ ChannelType type = (ChannelType) obj;
+ return Objects.equals(id(), type.id());
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+
+ /**
+ * Try to register channel with provided identifier. If identifier already used
+ * by another channel will throw {@link ChannelTypeAlreadyExist}.
+ *
+ * @param id Channel identifier. Must be positive.
+ * @param name Channel name.
+ * @return Register channel or existed one.
+ * @throws ChannelTypeAlreadyExist In case when channel identifier already used with another name.
+ */
+ public static ChannelType register(short id, String name) {
+ if (id < 0) {
+ throw new IllegalArgumentException("Negative identifier is not supported.");
+ }
+ ChannelType newChannel = new ChannelType(id, name);
+ ChannelType channelType = channels.putIfAbsent(id, newChannel);
+ if (channelType != null) {
+ throw new ChannelTypeAlreadyExist(id, name);
+ }
+ return newChannel;
+ }
+
+ /**
+ * Returns channel with provided identifier or
+ * {@code null} if channel with id doesn't registered yet.
+ *
+ * @param id Channel identifier.
+ * @return Channel with provided identifier or {@code null} if channel with id doesn't registered yet.
+ */
+ public static ChannelType getChannel(short id) {
+ return channels.get(id);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java b/modules/network-api/src/main/java/org/apache/ignite/network/ChannelTypeAlreadyExist.java
similarity index 51%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
copy to modules/network-api/src/main/java/org/apache/ignite/network/ChannelTypeAlreadyExist.java
index 2ab0dfec29..f08a5e4dc1 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ChannelTypeAlreadyExist.java
@@ -15,27 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.recovery;
-
-import java.util.UUID;
+package org.apache.ignite.network;
/**
- * Factory producing {@link RecoveryClientHandshakeManager} instances.
+ * Throws when register channel with already used identifier.
*/
-public interface RecoveryClientHandhakeManagerFactory {
+public class ChannelTypeAlreadyExist extends RuntimeException {
/**
- * Produces a {@link RecoveryClientHandshakeManager} instance.
+ * Constructor.
*
- * @param launchId ID of the launch.
- * @param consistentId Consistent ID of the node.
- * @param connectionId ID of the connection.
- * @param recoveryDescriptorProvider Provider of recovery descriptors to be used.
- * @return Created manager.
+ * @param id Channel identifier.
+ * @param name Channel name.
*/
- RecoveryClientHandshakeManager create(
- UUID launchId,
- String consistentId,
- short connectionId,
- RecoveryDescriptorProvider recoveryDescriptorProvider
- );
+ public ChannelTypeAlreadyExist(short id, String name) {
+ super("Channel " + name + " can't be registered because id " + id + " already used.");
+ }
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index 1b8f576598..72554af0ed 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -35,7 +35,7 @@ public interface MessagingService {
void weakSend(ClusterNode recipient, NetworkMessage msg);
/**
- * Tries to send the given message asynchronously to the specific cluster member.
+ * Tries to send the given message via default channel asynchronously to the specific cluster member.
*
* <p>Guarantees:
* <ul>
@@ -52,7 +52,29 @@ public interface MessagingService {
* @param msg Message which should be delivered.
* @return Future of the send operation.
*/
- CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg);
+ default CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
+ return send(recipient, ChannelType.DEFAULT, msg);
+ }
+
+ /**
+ * Tries to send the given message via specified channel asynchronously to the specific cluster member.
+ *
+ * <p>Guarantees:
+ * <ul>
+ * <li>Messages send to same receiver will be delivered in the same order as they were sent;</li>
+ * <li>If a message N has been successfully delivered to a member implies that all messages to same receiver
+ * preceding N have also been successfully delivered.</li>
+ * </ul>
+ *
+ * <p>Please note that the guarantees only work for same (sender, receiver) pairs. That is, if A sends m1 and m2
+ * to B, then the guarantees are maintained. If, on the other hand, A sends m1 to B and m2 to C, then no guarantees
+ * exist.
+ *
+ * @param recipient Recipient of the message.
+ * @param msg Message which should be delivered.
+ * @return Future of the send operation.
+ */
+ CompletableFuture<Void> send(ClusterNode recipient, ChannelType channelType, NetworkMessage msg);
/**
* Sends a response to a {@link #invoke} request.
@@ -63,12 +85,25 @@ public interface MessagingService {
* @param correlationId Correlation id when replying to the request.
* @return Future of the send operation.
*/
- CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId);
+ default CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
+ return respond(recipient, ChannelType.DEFAULT, msg, correlationId);
+ }
/**
* Sends a response to a {@link #invoke} request.
* Guarantees are the same as for the {@link #send(ClusterNode, NetworkMessage)}.
*
+ * @param recipient Recipient of the message.
+ * @param msg Message which should be delivered.
+ * @param correlationId Correlation id when replying to the request.
+ * @return Future of the send operation.
+ */
+ CompletableFuture<Void> respond(ClusterNode recipient, ChannelType channelType, NetworkMessage msg, long correlationId);
+
+ /**
+ * Sends a response to a {@link #invoke} request via default channel.
+ * Guarantees are the same as for the {@link #send(ClusterNode, NetworkMessage)}.
+ *
* <p>If the recipient cannot be resolved (because it has already left the physical topology), the returned future is resolved
* with the corresponding exception ({@link UnresolvableConsistentIdException}).
*
@@ -77,10 +112,28 @@ public interface MessagingService {
* @param correlationId Correlation id when replying to the request.
* @return Future of the send operation.
*/
- CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId);
+ default CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+ return respond(recipientConsistentId, ChannelType.DEFAULT, msg, correlationId);
+ }
/**
- * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
+ * Sends a response to a {@link #invoke} request via specified channel.
+ * Guarantees are the same as for the {@link #send(ClusterNode, NetworkMessage)}.
+ *
+ * <p>If the recipient cannot be resolved (because it has already left the physical topology), the returned future is resolved
+ * with the corresponding exception ({@link UnresolvableConsistentIdException}).
+ *
+ * @param recipientConsistentId Consistent ID of the recipient of the message.
+ * @param channelType Channel which will be used to message transfer.
+ * @param msg Message which should be delivered.
+ * @param correlationId Correlation id when replying to the request.
+ * @return Future of the send operation.
+ */
+ CompletableFuture<Void> respond(String recipientConsistentId, ChannelType channelType, NetworkMessage msg, long correlationId);
+
+ /**
+ * Sends a message via default channel asynchronously with same guarantees as
+ * {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
* completed successfully upon receiving a response.
*
* @param recipient Recipient of the message.
@@ -88,18 +141,49 @@ public interface MessagingService {
* @param timeout Waiting for response timeout in milliseconds.
* @return A future holding the response or error if the expected response was not received.
*/
- CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout);
+ default CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
+ return invoke(recipient, ChannelType.DEFAULT, msg, timeout);
+ }
+
+ /**
+ * Sends a message asynchronously via specified channel with same guarantees as
+ * {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
+ * completed successfully upon receiving a response.
+ *
+ * @param recipient Recipient of the message.
+ * @param channelType Channel which will be used to message transfer.
+ * @param msg Message which should be delivered.
+ * @param timeout Waiting for response timeout in milliseconds.
+ * @return A future holding the response or error if the expected response was not received.
+ */
+ CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, ChannelType channelType, NetworkMessage msg, long timeout);
+
+ /**
+ * Sends a message via default channel asynchronously with same guarantees as
+ * {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
+ * completed successfully upon receiving a response.
+ *
+ * @param recipientConsistentId Consistent ID of the recipient of the message.
+ * @param msg The message.
+ * @param timeout Waiting for response timeout in milliseconds.
+ * @return A future holding the response or error if the expected response was not received.
+ */
+ default CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, NetworkMessage msg, long timeout) {
+ return invoke(recipientConsistentId, ChannelType.DEFAULT, msg, timeout);
+ }
/**
- * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
+ * Sends a message via specified channel asynchronously with same guarantees as
+ * {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
* completed successfully upon receiving a response.
*
* @param recipientConsistentId Consistent ID of the recipient of the message.
+ * @param channelType Channel which will be used to message transfer.
* @param msg The message.
* @param timeout Waiting for response timeout in milliseconds.
* @return A future holding the response or error if the expected response was not received.
*/
- CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, NetworkMessage msg, long timeout);
+ CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, ChannelType channelType, NetworkMessage msg, long timeout);
/**
* Registers a listener for a group of network message events.
diff --git a/modules/network-api/src/test/java/org/apache/ignite/network/ChannelTypeTest.java b/modules/network-api/src/test/java/org/apache/ignite/network/ChannelTypeTest.java
new file mode 100644
index 0000000000..47058ea4e8
--- /dev/null
+++ b/modules/network-api/src/test/java/org/apache/ignite/network/ChannelTypeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test suite for {@link ChannelType}.
+ */
+public class ChannelTypeTest {
+ private static short OFFSET = Short.MAX_VALUE - 100;
+
+ @Test
+ public void testRegister() {
+ ChannelType testRegister = ChannelType.register(OFFSET, "TestRegister");
+
+ assertThat(testRegister, is(ChannelType.getChannel(OFFSET)));
+ OFFSET++;
+ }
+
+ @Test
+ public void testRegisterAlready() {
+ ChannelType.register(OFFSET, "TestRegister1");
+
+ assertThrows(ChannelTypeAlreadyExist.class, () -> ChannelType.register(OFFSET, "TestRegister2"));
+
+ OFFSET++;
+ }
+
+ @Test
+ public void testRegisterSame() {
+ ChannelType.register(OFFSET, "TestRegister1");
+
+ assertThrows(ChannelTypeAlreadyExist.class, () -> ChannelType.register(OFFSET, "TestRegister1"));
+
+ OFFSET++;
+ }
+
+ @Test
+ public void testGetNotRegistered() {
+ ChannelType channel = ChannelType.getChannel(OFFSET);
+
+ assertThat(channel, nullValue());
+ }
+
+ @Test
+ public void testRegisterNegativeIndex() {
+ assertThrows(IllegalArgumentException.class, () -> ChannelType.register((short) -1, "test"));
+ }
+}
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index d145b7c496..27f5816aa5 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -62,6 +62,7 @@ dependencies {
testImplementation libs.slf4j.jdk14
testImplementation libs.bytebuddy
testImplementation libs.compileTesting
+ testImplementation libs.awaitility
testFixturesAnnotationProcessor project(":ignite-network-annotation-processor")
testFixturesImplementation project(':ignite-configuration')
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index ea33e7d394..bfd7eace45 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ChannelType;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
@@ -95,22 +96,23 @@ public class ItConnectionManagerTest {
int port1 = 4000;
int port2 = 4001;
- ConnectionManagerWrapper manager1 = startManager(port1);
- ConnectionManagerWrapper manager2 = startManager(port2);
- var fut = new CompletableFuture<NetworkMessage>();
+ try (ConnectionManagerWrapper manager1 = startManager(port1);
+ ConnectionManagerWrapper manager2 = startManager(port2)) {
+ var fut = new CompletableFuture<NetworkMessage>();
- manager2.connectionManager.addListener((obj) -> fut.complete(obj.message()));
+ manager2.connectionManager.addListener((obj) -> fut.complete(obj.message()));
- NettySender sender = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
+ NettySender sender = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
- TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
+ TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
- sender.send(new OutNetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
+ sender.send(new OutNetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
- NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+ NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
- assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+ assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+ }
}
/**
@@ -127,40 +129,40 @@ public class ItConnectionManagerTest {
int port1 = 4000;
int port2 = 4001;
- ConnectionManagerWrapper manager1 = startManager(port1);
- ConnectionManagerWrapper manager2 = startManager(port2);
+ try (ConnectionManagerWrapper manager1 = startManager(port1);
+ ConnectionManagerWrapper manager2 = startManager(port2)) {
+ var fut = new CompletableFuture<NetworkMessage>();
- var fut = new CompletableFuture<NetworkMessage>();
-
- manager1.connectionManager.addListener((obj) -> fut.complete(obj.message()));
+ manager1.connectionManager.addListener((obj) -> fut.complete(obj.message()));
- NettySender senderFrom1to2 = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
+ NettySender senderFrom1to2 = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
- // Ensure a handshake has finished on both sides by sending a message.
- // TODO: IGNITE-16947 When the recovery protocol is implemented replace this with simple
- // CompletableFuture#get called on the send future.
- var messageReceivedOn2 = new CompletableFuture<Void>();
+ // Ensure a handshake has finished on both sides by sending a message.
+ // TODO: IGNITE-16947 When the recovery protocol is implemented replace this with simple
+ // CompletableFuture#get called on the send future.
+ var messageReceivedOn2 = new CompletableFuture<Void>();
- // If the message is received, that means that the handshake was successfully performed.
- manager2.connectionManager.addListener((message) -> messageReceivedOn2.complete(null));
+ // If the message is received, that means that the handshake was successfully performed.
+ manager2.connectionManager.addListener((message) -> messageReceivedOn2.complete(null));
- senderFrom1to2.send(new OutNetworkObject(testMessage, Collections.emptyList()));
+ senderFrom1to2.send(new OutNetworkObject(testMessage, Collections.emptyList()));
- messageReceivedOn2.get(3, TimeUnit.SECONDS);
+ messageReceivedOn2.get(3, TimeUnit.SECONDS);
- NettySender senderFrom2to1 = manager2.openChannelTo(manager1).get(3, TimeUnit.SECONDS);
+ NettySender senderFrom2to1 = manager2.openChannelTo(manager1).get(3, TimeUnit.SECONDS);
- InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
+ InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
- InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
+ InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
- assertEquals(clientLocalAddress, clientRemoteAddress);
+ assertEquals(clientLocalAddress, clientRemoteAddress);
- senderFrom2to1.send(new OutNetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
+ senderFrom2to1.send(new OutNetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
- NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+ NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
- assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+ assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+ }
}
/**
@@ -250,9 +252,7 @@ public class ItConnectionManagerTest {
public void testConnectMisconfiguredServer() throws Exception {
ConnectionManagerWrapper client = startManager(4000);
- ConnectionManagerWrapper server = startManager(4001, mockSerializationRegistry());
-
- try {
+ try (ConnectionManagerWrapper server = startManager(4001, mockSerializationRegistry())) {
client.openChannelTo(server).get(3, TimeUnit.SECONDS);
} catch (ExecutionException e) {
assertThat(e.getCause(), isA(IOException.class));
@@ -266,9 +266,7 @@ public class ItConnectionManagerTest {
public void testConnectMisconfiguredClient() throws Exception {
ConnectionManagerWrapper client = startManager(4000, mockSerializationRegistry());
- ConnectionManagerWrapper server = startManager(4001);
-
- try {
+ try (ConnectionManagerWrapper server = startManager(4001)) {
client.openChannelTo(server).get(3, TimeUnit.SECONDS);
} catch (ExecutionException e) {
assertThat(e.getCause(), isA(DecoderException.class));
@@ -376,7 +374,12 @@ public class ItConnectionManagerTest {
}
OrderingFuture<NettySender> openChannelTo(ConnectionManagerWrapper recipient) {
- return connectionManager.channel(recipient.connectionManager.consistentId(), recipient.connectionManager.localAddress());
+ return connectionManager.channel(
+ recipient.connectionManager.consistentId(),
+ ChannelType.DEFAULT,
+ recipient.connectionManager.localAddress()
+ );
}
+
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 18ef33dbfd..66a5ea74b6 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.network.netty;
+import static org.apache.ignite.network.ChannelType.getChannel;
+
import io.netty.bootstrap.Bootstrap;
import java.net.InetSocketAddress;
import java.util.Collection;
@@ -37,12 +39,13 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ChannelType;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -66,12 +69,11 @@ public class ConnectionManager {
/** Server. */
private final NettyServer server;
- // TODO: IGNITE-16948 Should be a map consistentId -> connectionId -> sender
/** Channels map from consistentId to {@link NettySender}. */
- private final Map<String, NettySender> channels = new ConcurrentHashMap<>();
+ private final Map<ConnectorKey<String>, NettySender> channels = new ConcurrentHashMap<>();
/** Clients. */
- private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+ private final Map<ConnectorKey<InetSocketAddress>, NettyClient> clients = new ConcurrentHashMap<>();
/** Serialization service. */
private final SerializationService serializationService;
@@ -86,7 +88,7 @@ public class ConnectionManager {
private final UUID launchId;
/** Factory producing {@link RecoveryClientHandshakeManager} instances. */
- private final RecoveryClientHandhakeManagerFactory clientHandhakeManagerFactory;
+ private final RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory;
/** Start flag. */
private final AtomicBoolean started = new AtomicBoolean(false);
@@ -122,7 +124,7 @@ public class ConnectionManager {
launchId,
consistentId,
bootstrapFactory,
- new DefaultRecoveryClientHandhakeManagerFactory()
+ new DefaultRecoveryClientHandshakeManagerFactory()
);
}
@@ -134,7 +136,7 @@ public class ConnectionManager {
* @param launchId Launch id of this node.
* @param consistentId Consistent id of this node.
* @param bootstrapFactory Bootstrap factory.
- * @param clientHandhakeManagerFactory Factory for {@link RecoveryClientHandshakeManager} instances.
+ * @param clientHandshakeManagerFactory Factory for {@link RecoveryClientHandshakeManager} instances.
*/
public ConnectionManager(
NetworkView networkConfiguration,
@@ -142,12 +144,12 @@ public class ConnectionManager {
UUID launchId,
String consistentId,
NettyBootstrapFactory bootstrapFactory,
- RecoveryClientHandhakeManagerFactory clientHandhakeManagerFactory
+ RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory
) {
this.serializationService = serializationService;
this.launchId = launchId;
this.consistentId = consistentId;
- this.clientHandhakeManagerFactory = clientHandhakeManagerFactory;
+ this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
this.networkConfiguration = networkConfiguration;
this.server = new NettyServer(
@@ -208,13 +210,13 @@ public class ConnectionManager {
* @param address Another node's address.
* @return Sender.
*/
- public OrderingFuture<NettySender> channel(@Nullable String consistentId, InetSocketAddress address) {
+ public OrderingFuture<NettySender> channel(@Nullable String consistentId, ChannelType type, InetSocketAddress address) {
if (consistentId != null) {
// If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
// or an inbound connection associated with that consistent id.
NettySender channel = channels.compute(
- consistentId,
- (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
+ new ConnectorKey<>(consistentId, type),
+ (key, sender) -> (sender == null || !sender.isOpen()) ? null : sender
);
if (channel != null) {
@@ -225,10 +227,9 @@ public class ConnectionManager {
// Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
// when the client is ready for write operations, so previously started client, that didn't establish connection
// or didn't perform the handshake operation, can be reused.
- // TODO: IGNITE-16948 Connection id may be different from 0
NettyClient client = clients.compute(
- address,
- (addr, existingClient) -> isClientConnected(existingClient) ? existingClient : connect(addr, (short) 0)
+ new ConnectorKey<>(address, type),
+ (key, existingClient) -> isClientConnected(existingClient) ? existingClient : connect(key.id(), key.type())
);
return client.sender();
@@ -253,7 +254,8 @@ public class ConnectionManager {
* @param channel Channel from client to this {@link #server}.
*/
private void onNewIncomingChannel(NettySender channel) {
- NettySender oldChannel = channels.put(channel.consistentId(), channel);
+ ConnectorKey<String> key = new ConnectorKey<>(channel.consistentId(), getChannel(channel.channelId()));
+ NettySender oldChannel = channels.put(key, channel);
if (oldChannel != null) {
oldChannel.close();
@@ -266,20 +268,21 @@ public class ConnectionManager {
* @param address Target address.
* @return New netty client.
*/
- private NettyClient connect(InetSocketAddress address, short connectionId) {
+ private NettyClient connect(InetSocketAddress address, ChannelType channelType) {
var client = new NettyClient(
address,
serializationService,
- createClientHandshakeManager(connectionId),
+ createClientHandshakeManager(channelType.id()),
this::onMessage,
this.networkConfiguration.ssl()
);
client.start(clientBootstrap).whenComplete((sender, throwable) -> {
if (throwable == null) {
- channels.put(sender.consistentId(), sender);
+ ConnectorKey<String> key = new ConnectorKey<>(sender.consistentId(), getChannel(sender.channelId()));
+ channels.put(key, sender);
} else {
- clients.remove(address);
+ clients.remove(new ConnectorKey<>(address, channelType));
}
});
@@ -331,7 +334,7 @@ public class ConnectionManager {
}
private HandshakeManager createClientHandshakeManager(short connectionId) {
- return clientHandhakeManagerFactory.create(
+ return clientHandshakeManagerFactory.create(
launchId,
consistentId,
connectionId,
@@ -353,6 +356,11 @@ public class ConnectionManager {
return server;
}
+ @TestOnly
+ public SerializationService serializationService() {
+ return serializationService;
+ }
+
/**
* Returns this node's consistent id.
*
@@ -372,23 +380,16 @@ public class ConnectionManager {
return Collections.unmodifiableCollection(clients.values());
}
- /**
- * Returns map of the channels.
- *
- * @return Map of the channels.
- */
- @TestOnly
- public Map<String, NettySender> channels() {
- return Collections.unmodifiableMap(channels);
- }
-
/**
* Factory producing vanilla {@link RecoveryClientHandshakeManager} instances.
*/
- private static class DefaultRecoveryClientHandhakeManagerFactory implements RecoveryClientHandhakeManagerFactory {
+ private static class DefaultRecoveryClientHandshakeManagerFactory implements RecoveryClientHandshakeManagerFactory {
@Override
- public RecoveryClientHandshakeManager create(UUID launchId, String consistentId, short connectionId,
- RecoveryDescriptorProvider recoveryDescriptorProvider) {
+ public RecoveryClientHandshakeManager create(UUID launchId,
+ String consistentId,
+ short connectionId,
+ RecoveryDescriptorProvider recoveryDescriptorProvider
+ ) {
return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, recoveryDescriptorProvider);
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java
new file mode 100644
index 0000000000..9b0f39e4a9
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import org.apache.ignite.network.ChannelType;
+
+/**
+ * Composite key object for connectors.
+ *
+ * @param <T> identifier type.
+ */
+public class ConnectorKey<T> {
+ private final T id;
+
+ private final ChannelType type;
+
+ /**
+ * Constructor.
+ *
+ * @param id Connector identifier.
+ * @param type Channel type.
+ */
+ public ConnectorKey(T id, ChannelType type) {
+ this.id = id;
+ this.type = type;
+ }
+
+ public T id() {
+ return id;
+ }
+
+ public ChannelType type() {
+ return type;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ConnectorKey<?> that = (ConnectorKey<?>) o;
+
+ return (id != null ? id.equals(that.id) : that.id == null)
+ && (type != null ? type.equals(that.type) : that.type == null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = id != null ? id.hashCode() : 0;
+ result = 31 * result + (type != null ? type.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 3239e12103..2294646d60 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -39,17 +39,21 @@ public class NettySender {
/** Consistent id of the remote node. */
private final String consistentId;
+ private final short channelId;
+
/**
* Constructor.
*
- * @param channel Netty channel.
- * @param launchId Launch id of the remote node.
+ * @param channel Netty channel.
+ * @param launchId Launch id of the remote node.
* @param consistentId Consistent id of the remote node.
+ * @param channelId channel identifier.
*/
- public NettySender(Channel channel, String launchId, String consistentId) {
+ public NettySender(Channel channel, String launchId, String consistentId, short channelId) {
this.channel = channel;
this.launchId = launchId;
this.consistentId = consistentId;
+ this.channelId = channelId;
}
/**
@@ -80,6 +84,15 @@ public class NettySender {
return consistentId;
}
+ /**
+ * Returns channel identifier.
+ *
+ * @return Channel identifier.
+ */
+ public short channelId() {
+ return channelId;
+ }
+
/**
* Closes channel.
*/
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 87b2a889e7..8925b19834 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -87,7 +87,9 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
* @param recoveryDescriptorProvider Recovery descriptor provider.
*/
public RecoveryClientHandshakeManager(
- UUID launchId, String consistentId, short connectionId,
+ UUID launchId,
+ String consistentId,
+ short connectionId,
RecoveryDescriptorProvider recoveryDescriptorProvider) {
this.launchId = launchId;
this.consistentId = consistentId;
@@ -112,8 +114,12 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
this.remoteLaunchId = msg.launchId();
this.remoteConsistentId = msg.consistentId();
- this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId, connectionId,
- false);
+ this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(
+ remoteConsistentId,
+ remoteLaunchId,
+ connectionId,
+ false
+ );
handshake(recoveryDescriptor);
@@ -199,7 +205,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
// Removes handshake handler from the pipeline as the handshake is finished
this.ctx.pipeline().remove(this.handler);
- handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+ handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, connectionId));
}
@TestOnly
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerFactory.java
similarity index 96%
rename from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
rename to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerFactory.java
index 2ab0dfec29..b4f88559ce 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerFactory.java
@@ -22,7 +22,7 @@ import java.util.UUID;
/**
* Factory producing {@link RecoveryClientHandshakeManager} instances.
*/
-public interface RecoveryClientHandhakeManagerFactory {
+public interface RecoveryClientHandshakeManagerFactory {
/**
* Produces a {@link RecoveryClientHandshakeManager} instance.
*
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 3a252b33ef..54501be1d8 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -61,6 +61,8 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
/** Remote node's consistent id. */
private String remoteConsistentId;
+ private short remoteChannelId;
+
/** Netty pipeline channel handler context. */
private ChannelHandlerContext ctx;
@@ -88,8 +90,11 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
* @param recoveryDescriptorProvider Recovery descriptor provider.
*/
public RecoveryServerHandshakeManager(
- UUID launchId, String consistentId, NetworkMessagesFactory messageFactory,
- RecoveryDescriptorProvider recoveryDescriptorProvider) {
+ UUID launchId,
+ String consistentId,
+ NetworkMessagesFactory messageFactory,
+ RecoveryDescriptorProvider recoveryDescriptorProvider
+ ) {
this.launchId = launchId;
this.consistentId = consistentId;
this.messageFactory = messageFactory;
@@ -132,9 +137,13 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
this.remoteLaunchId = msg.launchId();
this.remoteConsistentId = msg.consistentId();
this.receivedCount = msg.receivedCount();
+ this.remoteChannelId = msg.connectionId();
- this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId,
- msg.connectionId(), true);
+ this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(
+ remoteConsistentId,
+ remoteLaunchId,
+ remoteChannelId,
+ true);
handshake(recoveryDescriptor);
@@ -216,7 +225,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
// Removes handshake handler from the pipeline as the handshake is finished
this.ctx.pipeline().remove(this.handler);
- handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+ handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, remoteChannelId));
}
@TestOnly
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 7bde48da3f..fef1ed918e 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.Function;
+import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -127,17 +128,17 @@ public class DefaultMessagingService extends AbstractMessagingService {
}
@Override
- public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
- return send0(recipient, msg, null);
+ public CompletableFuture<Void> send(ClusterNode recipient, ChannelType channelType, NetworkMessage msg) {
+ return send0(recipient, channelType, msg, null);
}
@Override
- public CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
- return send0(recipient, msg, correlationId);
+ public CompletableFuture<Void> respond(ClusterNode recipient, ChannelType type, NetworkMessage msg, long correlationId) {
+ return send0(recipient, type, msg, correlationId);
}
@Override
- public CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+ public CompletableFuture<Void> respond(String recipientConsistentId, ChannelType type, NetworkMessage msg, long correlationId) {
ClusterNode recipient = topologyService.getByConsistentId(recipientConsistentId);
if (recipient == null) {
@@ -150,13 +151,13 @@ public class DefaultMessagingService extends AbstractMessagingService {
}
@Override
- public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
- return invoke0(recipient, msg, timeout);
+ public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, ChannelType type, NetworkMessage msg, long timeout) {
+ return invoke0(recipient, type, msg, timeout);
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, NetworkMessage msg, long timeout) {
+ public CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, ChannelType type, NetworkMessage msg, long timeout) {
ClusterNode recipient = topologyService.getByConsistentId(recipientConsistentId);
if (recipient == null) {
@@ -165,7 +166,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
);
}
- return invoke0(recipient, msg, timeout);
+ return invoke0(recipient, type, msg, timeout);
}
/**
@@ -176,7 +177,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
* @param correlationId Correlation id. Not null iff the message is a response to a {@link #invoke} request.
* @return Future of the send operation.
*/
- private CompletableFuture<Void> send0(ClusterNode recipient, NetworkMessage msg, @Nullable Long correlationId) {
+ private CompletableFuture<Void> send0(ClusterNode recipient, ChannelType type, NetworkMessage msg, @Nullable Long correlationId) {
if (connectionManager.isStopped()) {
return failedFuture(new NodeStoppingException());
}
@@ -200,7 +201,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
NetworkMessage message = correlationId != null ? responseFromMessage(msg, correlationId) : msg;
- return sendMessage0(recipient.name(), recipientAddress, message);
+ return sendMessage0(recipient.name(), type, recipientAddress, message);
}
private boolean shouldDropMessage(ClusterNode recipient, NetworkMessage msg) {
@@ -217,7 +218,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
* @param timeout Invocation timeout.
* @return A future holding the response or error if the expected response was not received.
*/
- private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, NetworkMessage msg, long timeout) {
+ private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, ChannelType type, NetworkMessage msg, long timeout) {
if (connectionManager.isStopped()) {
return failedFuture(new NodeStoppingException());
}
@@ -244,21 +245,27 @@ public class DefaultMessagingService extends AbstractMessagingService {
InvokeRequest message = requestFromMessage(msg, correlationId);
- return sendMessage0(recipient.name(), recipientAddress, message).thenCompose(unused -> responseFuture);
+ return sendMessage0(recipient.name(), type, recipientAddress, message).thenCompose(unused -> responseFuture);
}
/**
* Sends network object.
*
* @param consistentId Target consistent ID. Can be {@code null} if the node has not been added to the topology.
+ * @param type Channel type for send.
* @param addr Target address.
* @param message Message.
*
* @return Future of the send operation.
*/
- private CompletableFuture<Void> sendMessage0(@Nullable String consistentId, InetSocketAddress addr, NetworkMessage message) {
+ private CompletableFuture<Void> sendMessage0(
+ @Nullable String consistentId,
+ ChannelType type,
+ InetSocketAddress addr,
+ NetworkMessage message
+ ) {
if (isInNetworkThread()) {
- return CompletableFuture.supplyAsync(() -> sendMessage0(consistentId, addr, message), outboundExecutor)
+ return CompletableFuture.supplyAsync(() -> sendMessage0(consistentId, type, addr, message), outboundExecutor)
.thenCompose(Function.identity());
}
@@ -270,7 +277,8 @@ public class DefaultMessagingService extends AbstractMessagingService {
return failedFuture(new IgniteException("Failed to marshal message: " + e.getMessage(), e));
}
- return connectionManager.channel(consistentId, addr)
+ OrderingFuture<NettySender> channel = connectionManager.channel(consistentId, type, addr);
+ return channel
.thenComposeToCompletable(sender -> sender.send(new OutNetworkObject(message, descriptors)));
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index 466278e413..4a9337c38d 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -255,7 +255,7 @@ public class NettyClientTest {
/** Constructor. */
private MockClientHandshakeManager(Channel channel) {
- this.sender = new NettySender(channel, "", "");
+ this.sender = new NettySender(channel, "", "", (short) 0);
}
/** {@inheritDoc} */
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index 5b1754b349..5987dcf41f 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -19,8 +19,10 @@ package org.apache.ignite.network;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
@@ -33,16 +35,20 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.messages.AllTypesMessageImpl;
import org.apache.ignite.internal.network.messages.TestMessage;
+import org.apache.ignite.internal.network.messages.TestMessageImpl;
+import org.apache.ignite.internal.network.messages.TestMessageSerializationFactory;
import org.apache.ignite.internal.network.messages.TestMessageTypes;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
import org.apache.ignite.internal.network.netty.ConnectionManager;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
@@ -51,7 +57,10 @@ import org.apache.ignite.internal.network.serialization.UserObjectSerializationC
import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageSerializationFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.serialization.MessageSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -64,6 +73,8 @@ class DefaultMessagingServiceTest {
private static final int SENDER_PORT = 2001;
private static final int RECEIVER_PORT = 2002;
+ private static final ChannelType TEST_CHANNEL = ChannelType.register(Short.MAX_VALUE, "Test");
+
@Mock
private TopologyService topologyService;
@@ -133,6 +144,109 @@ class DefaultMessagingServiceTest {
}
}
+ @Test
+ public void sendMessagesOneChannel() throws Exception {
+ AtomicBoolean release = new AtomicBoolean(false);
+ MessageSerializer<TestMessage> serializer = new TestMessageSerializationFactory(
+ new TestMessagesFactory()).createSerializer();
+ Serializer longWaitSerializer = new Serializer(TestMessageImpl.GROUP_TYPE, TestMessageImpl.TYPE,
+ (message, writer) -> release.get()
+ && serializer.writeMessage((TestMessage) message, writer));
+ try (Services services = createMessagingService(
+ senderNode,
+ senderNetworkConfig,
+ () -> {},
+ mockSerializationRegistry(longWaitSerializer));
+ Services receiverServices = createMessagingService(receiverNode, receiverNetworkConfig, () -> {})
+ ) {
+ CountDownLatch latch = new CountDownLatch(2);
+ receiverServices.messagingService.addMessageHandler(
+ TestMessageTypes.class,
+ (message, sender, correlationId) -> latch.countDown()
+ );
+
+ services.messagingService.send(receiverNode, TestMessageImpl.builder().build());
+ services.messagingService.send(receiverNode, AllTypesMessageImpl.builder().build());
+
+ assertThat(latch.getCount(), is(2L));
+ release.set(true);
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void sendMessagesTwoChannels() throws Exception {
+ AtomicBoolean release = new AtomicBoolean(false);
+ MessageSerializer<TestMessage> serializer = new TestMessageSerializationFactory(
+ new TestMessagesFactory()).createSerializer();
+ Serializer longWaitSerializer = new Serializer(TestMessageImpl.GROUP_TYPE, TestMessageImpl.TYPE,
+ (message, writer) -> release.get()
+ && serializer.writeMessage((TestMessage) message, writer));
+ try (Services services = createMessagingService(
+ senderNode,
+ senderNetworkConfig,
+ () -> {},
+ mockSerializationRegistry(longWaitSerializer));
+ Services receiverServices = createMessagingService(receiverNode, receiverNetworkConfig, () -> {})
+ ) {
+ CountDownLatch latch = new CountDownLatch(2);
+ receiverServices.messagingService.addMessageHandler(
+ TestMessageTypes.class,
+ (message, sender, correlationId) -> latch.countDown()
+ );
+
+ services.messagingService.send(receiverNode, TestMessageImpl.builder().build());
+ services.messagingService.send(receiverNode, TEST_CHANNEL, AllTypesMessageImpl.builder().build());
+
+ await().timeout(1, TimeUnit.SECONDS)
+ .until(() -> latch.getCount() == 1);
+
+ release.set(true);
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ }
+ }
+
+ private static MessageSerializationRegistry mockSerializationRegistry(Serializer... serializers) {
+ MessageSerializationRegistry defaultRegistry = defaultSerializationRegistry();
+
+ MessageSerializationRegistry wrapper = new MessageSerializationRegistry() {
+ @Override
+ public MessageSerializationRegistry registerFactory(short groupType, short messageType,
+ MessageSerializationFactory<?> factory) {
+ return this;
+ }
+
+ @Override
+ public <T extends NetworkMessage> MessageSerializer<T> createSerializer(short groupType, short messageType) {
+ for (Serializer serializer : serializers) {
+ if (serializer.groupType == groupType && serializer.messageType == messageType) {
+ return (MessageSerializer<T>) serializer.serializer;
+ }
+ }
+ return defaultRegistry.createSerializer(groupType, messageType);
+ }
+
+ @Override
+ public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short groupType, short messageType) {
+ return defaultRegistry.createDeserializer(groupType, messageType);
+ }
+ };
+
+ return wrapper;
+ }
+
+ private static class Serializer {
+ private final short groupType;
+ private final short messageType;
+ private final MessageSerializer<? extends NetworkMessage> serializer;
+
+ private Serializer(short groupType, short messageType, MessageSerializer<? extends NetworkMessage> serializer) {
+ this.groupType = groupType;
+ this.messageType = messageType;
+ this.serializer = serializer;
+ }
+ }
+
private static void awaitQuietly(CountDownLatch latch) {
try {
latch.await();
@@ -146,6 +260,15 @@ class DefaultMessagingServiceTest {
}
private Services createMessagingService(ClusterNode node, NetworkConfiguration networkConfig, Runnable beforeHandshake) {
+ return createMessagingService(node, networkConfig, beforeHandshake, messageSerializationRegistry);
+ }
+
+ private Services createMessagingService(
+ ClusterNode node,
+ NetworkConfiguration networkConfig,
+ Runnable beforeHandshake,
+ MessageSerializationRegistry registry
+ ) {
ClassDescriptorRegistry classDescriptorRegistry = new ClassDescriptorRegistry();
ClassDescriptorFactory classDescriptorFactory = new ClassDescriptorFactory(classDescriptorRegistry);
UserObjectMarshaller marshaller = new DefaultUserObjectMarshaller(classDescriptorRegistry, classDescriptorFactory);
@@ -158,7 +281,7 @@ class DefaultMessagingServiceTest {
);
SerializationService serializationService = new SerializationService(
- messageSerializationRegistry,
+ registry,
new UserObjectSerializationContext(classDescriptorRegistry, classDescriptorFactory, marshaller)
);
@@ -182,10 +305,13 @@ class DefaultMessagingServiceTest {
return new Services(connectionManager, messagingService);
}
- private static RecoveryClientHandhakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) {
- return new RecoveryClientHandhakeManagerFactory() {
+ private static RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) {
+ return new RecoveryClientHandshakeManagerFactory() {
@Override
- public RecoveryClientHandshakeManager create(UUID launchId, String consistentId, short connectionId,
+ public RecoveryClientHandshakeManager create(
+ UUID launchId,
+ String consistentId,
+ short connectionId,
RecoveryDescriptorProvider recoveryDescriptorProvider) {
return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, recoveryDescriptorProvider) {
@Override
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index 10e217ac2a..4e6b0caaed 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -28,6 +28,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.network.AbstractMessagingService;
import org.apache.ignite.network.AbstractTopologyService;
+import org.apache.ignite.network.ChannelType;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -171,7 +172,7 @@ public class ClusterServiceFactory {
/** {@inheritDoc} */
@Override
- public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
+ public CompletableFuture<Void> send(ClusterNode recipient, ChannelType channelType, NetworkMessage msg) {
for (var handler : messagingServicesByNode.get(recipient.name()).messageHandlers(msg.groupType())) {
handler.onReceived(msg, localNodeName, null);
}
@@ -181,25 +182,25 @@ public class ClusterServiceFactory {
/** {@inheritDoc} */
@Override
- public CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
+ public CompletableFuture<Void> respond(ClusterNode recipient, ChannelType type, NetworkMessage msg, long correlationId) {
throw new AssertionError("Not implemented yet");
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+ public CompletableFuture<Void> respond(String recipientConsistentId, ChannelType type, NetworkMessage msg, long correlationId) {
throw new AssertionError("Not implemented yet");
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
+ public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, ChannelType type, NetworkMessage msg, long timeout) {
throw new AssertionError("Not implemented yet");
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<NetworkMessage> invoke(String recipientNodeId, NetworkMessage msg, long timeout) {
+ public CompletableFuture<NetworkMessage> invoke(String recipientNodeId, ChannelType type, NetworkMessage msg, long timeout) {
throw new AssertionError("Not implemented yet");
}