You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/03/22 11:57:17 UTC

[ignite-3] branch main updated: IGNITE-16948 Multichannel message transfer (#1780)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0489a9e3eb IGNITE-16948 Multichannel message transfer (#1780)
0489a9e3eb is described below

commit 0489a9e3eb9f62a66a5203301d30222b71dd0b1e
Author: Mikhail <Po...@users.noreply.github.com>
AuthorDate: Wed Mar 22 14:57:12 2023 +0300

    IGNITE-16948 Multichannel message transfer (#1780)
---
 .../internal/deployunit/DeploymentManagerImpl.java |  12 +-
 .../org/apache/ignite/network/ChannelType.java     | 127 +++++++++++++++++++
 .../ignite/network/ChannelTypeAlreadyExist.java}   |  26 ++--
 .../apache/ignite/network/MessagingService.java    | 100 +++++++++++++--
 .../org/apache/ignite/network/ChannelTypeTest.java |  70 +++++++++++
 modules/network/build.gradle                       |   1 +
 .../network/netty/ItConnectionManagerTest.java     |  77 ++++++------
 .../internal/network/netty/ConnectionManager.java  |  69 +++++------
 .../internal/network/netty/ConnectorKey.java       |  72 +++++++++++
 .../ignite/internal/network/netty/NettySender.java |  19 ++-
 .../recovery/RecoveryClientHandshakeManager.java   |  14 ++-
 ... => RecoveryClientHandshakeManagerFactory.java} |   2 +-
 .../recovery/RecoveryServerHandshakeManager.java   |  19 ++-
 .../ignite/network/DefaultMessagingService.java    |  40 +++---
 .../internal/network/netty/NettyClientTest.java    |   2 +-
 .../network/DefaultMessagingServiceTest.java       | 136 ++++++++++++++++++++-
 .../engine/framework/ClusterServiceFactory.java    |  11 +-
 17 files changed, 657 insertions(+), 140 deletions(-)

diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 1d51272eff..855b902844 100644
--- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ChannelType;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 
@@ -83,6 +84,8 @@ public class DeploymentManagerImpl implements IgniteDeployment, IgniteComponent
 
     private static final String TMP_SUFFIX = ".tmp";
 
+    private static final ChannelType DEPLOYMENT_CHANNEL = ChannelType.register((short) 1, "DeploymentUnits");
+
     /**
      * Meta storage.
      */
@@ -220,7 +223,8 @@ public class DeploymentManagerImpl implements IgniteDeployment, IgniteComponent
                 }).thenApply(logicalTopologySnapshot -> {
                     for (ClusterNode node : logicalTopologySnapshot.nodes()) {
                         clusterService.messagingService()
-                                .invoke(node, UndeployUnitRequestImpl.builder()
+                                .invoke(node, DEPLOYMENT_CHANNEL,
+                                        UndeployUnitRequestImpl.builder()
                                                 .id(id)
                                                 .version(version.render())
                                                 .build(),
@@ -302,7 +306,7 @@ public class DeploymentManagerImpl implements IgniteDeployment, IgniteComponent
             if (throwable != null) {
                 builder.error(throwable);
             }
-            clusterService.messagingService().respond(senderConsistentId,
+            clusterService.messagingService().respond(senderConsistentId, DEPLOYMENT_CHANNEL,
                     builder.build(), correlationId);
         });
     }
@@ -317,12 +321,12 @@ public class DeploymentManagerImpl implements IgniteDeployment, IgniteComponent
         } catch (IOException e) {
             LOG.error("Failed to undeploy unit " + executeRequest.id() + ":" + executeRequest.version(), e);
             clusterService.messagingService()
-                    .respond(senderConsistentId, UndeployUnitResponseImpl.builder().error(e).build(), correlationId);
+                    .respond(senderConsistentId, DEPLOYMENT_CHANNEL, UndeployUnitResponseImpl.builder().error(e).build(), correlationId);
             return;
         }
 
         clusterService.messagingService()
-                .respond(senderConsistentId, UndeployUnitResponseImpl.builder().build(), correlationId);
+                .respond(senderConsistentId, DEPLOYMENT_CHANNEL, UndeployUnitResponseImpl.builder().build(), correlationId);
     }
 
     private CompletableFuture<Boolean> doDeploy(DeployUnitRequest executeRequest) {
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java b/modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java
new file mode 100644
index 0000000000..5ff0cc751b
--- /dev/null
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Data class with channel information.
+ *      May be used as channel pointer in {@link MessagingService} for sending messages in exclusive channel.
+ */
+public final class ChannelType {
+    static {
+        Map<Short, ChannelType> tmpChannels = new ConcurrentHashMap<>(IgniteUtils.capacity(Short.MAX_VALUE));
+        ChannelType defaultChannel = new ChannelType((short) 0, "Default");
+        tmpChannels.put((short) 0, defaultChannel);
+
+        DEFAULT = defaultChannel;
+        channels = tmpChannels;
+    }
+
+    public static final ChannelType DEFAULT;
+
+    private static final Map<Short, ChannelType> channels;
+
+    /**
+     * Channel identifier.
+     */
+    private final short id;
+
+    /**
+     * Channel name.
+     */
+    private final String name;
+
+    private ChannelType(short id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    /**
+     * Channel identifier, must be unique for each implementation.
+     *
+     * @return Channel identifier.
+     */
+    public short id() {
+        return id;
+    }
+
+    /**
+     * Returns channel name.
+     *
+     * @return Channel name.
+     */
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public int hashCode() {
+        return id();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ChannelType)) {
+            return false;
+        }
+        ChannelType type = (ChannelType) obj;
+        return Objects.equals(id(), type.id());
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+
+    /**
+     * Try to register channel with provided identifier. If identifier already used
+     *    by another channel will throw {@link ChannelTypeAlreadyExist}.
+     *
+     * @param id Channel identifier. Must be positive.
+     * @param name Channel name.
+     * @return Register channel or existed one.
+     * @throws ChannelTypeAlreadyExist In case when channel identifier already used with another name.
+     */
+    public static ChannelType register(short id, String name) {
+        if (id < 0) {
+            throw new IllegalArgumentException("Negative identifier is not supported.");
+        }
+        ChannelType newChannel = new ChannelType(id, name);
+        ChannelType channelType = channels.putIfAbsent(id, newChannel);
+        if (channelType != null) {
+            throw new ChannelTypeAlreadyExist(id, name);
+        }
+        return newChannel;
+    }
+
+    /**
+     * Returns channel with provided identifier or
+     *      {@code null} if channel with id doesn't registered yet.
+     *
+     * @param id Channel identifier.
+     * @return Channel with provided identifier or {@code null} if channel with id doesn't registered yet.
+     */
+    public static ChannelType getChannel(short id) {
+        return channels.get(id);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java b/modules/network-api/src/main/java/org/apache/ignite/network/ChannelTypeAlreadyExist.java
similarity index 51%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
copy to modules/network-api/src/main/java/org/apache/ignite/network/ChannelTypeAlreadyExist.java
index 2ab0dfec29..f08a5e4dc1 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ChannelTypeAlreadyExist.java
@@ -15,27 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network.recovery;
-
-import java.util.UUID;
+package org.apache.ignite.network;
 
 /**
- * Factory producing {@link RecoveryClientHandshakeManager} instances.
+ * Throws when register channel with already used identifier.
  */
-public interface RecoveryClientHandhakeManagerFactory {
+public class ChannelTypeAlreadyExist extends RuntimeException {
     /**
-     * Produces a {@link RecoveryClientHandshakeManager} instance.
+     * Constructor.
      *
-     * @param launchId                   ID of the launch.
-     * @param consistentId               Consistent ID of the node.
-     * @param connectionId               ID of the connection.
-     * @param recoveryDescriptorProvider Provider of recovery descriptors to be used.
-     * @return Created manager.
+     * @param id Channel identifier.
+     * @param name Channel name.
      */
-    RecoveryClientHandshakeManager create(
-            UUID launchId,
-            String consistentId,
-            short connectionId,
-            RecoveryDescriptorProvider recoveryDescriptorProvider
-    );
+    public ChannelTypeAlreadyExist(short id, String name) {
+        super("Channel " + name + " can't be registered because id " + id + " already used.");
+    }
 }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index 1b8f576598..72554af0ed 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -35,7 +35,7 @@ public interface MessagingService {
     void weakSend(ClusterNode recipient, NetworkMessage msg);
 
     /**
-     * Tries to send the given message asynchronously to the specific cluster member.
+     * Tries to send the given message via default channel asynchronously to the specific cluster member.
      *
      * <p>Guarantees:
      * <ul>
@@ -52,7 +52,29 @@ public interface MessagingService {
      * @param msg       Message which should be delivered.
      * @return Future of the send operation.
      */
-    CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg);
+    default CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
+        return send(recipient, ChannelType.DEFAULT, msg);
+    }
+
+    /**
+     * Tries to send the given message via specified channel asynchronously to the specific cluster member.
+     *
+     * <p>Guarantees:
+     * <ul>
+     *     <li>Messages send to same receiver will be delivered in the same order as they were sent;</li>
+     *     <li>If a message N has been successfully delivered to a member implies that all messages to same receiver
+     *     preceding N have also been successfully delivered.</li>
+     * </ul>
+     *
+     * <p>Please note that the guarantees only work for same (sender, receiver) pairs. That is, if A sends m1 and m2
+     * to B, then the guarantees are maintained. If, on the other hand, A sends m1 to B and m2 to C, then no guarantees
+     * exist.
+     *
+     * @param recipient Recipient of the message.
+     * @param msg       Message which should be delivered.
+     * @return Future of the send operation.
+     */
+    CompletableFuture<Void> send(ClusterNode recipient, ChannelType channelType, NetworkMessage msg);
 
     /**
      * Sends a response to a {@link #invoke} request.
@@ -63,12 +85,25 @@ public interface MessagingService {
      * @param correlationId Correlation id when replying to the request.
      * @return Future of the send operation.
      */
-    CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId);
+    default CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
+        return respond(recipient, ChannelType.DEFAULT, msg, correlationId);
+    }
 
     /**
      * Sends a response to a {@link #invoke} request.
      * Guarantees are the same as for the {@link #send(ClusterNode, NetworkMessage)}.
      *
+     * @param recipient     Recipient of the message.
+     * @param msg           Message which should be delivered.
+     * @param correlationId Correlation id when replying to the request.
+     * @return Future of the send operation.
+     */
+    CompletableFuture<Void> respond(ClusterNode recipient, ChannelType channelType, NetworkMessage msg, long correlationId);
+
+    /**
+     * Sends a response to a {@link #invoke} request via default channel.
+     * Guarantees are the same as for the {@link #send(ClusterNode, NetworkMessage)}.
+     *
      * <p>If the recipient cannot be resolved (because it has already left the physical topology), the returned future is resolved
      * with the corresponding exception ({@link UnresolvableConsistentIdException}).
      *
@@ -77,10 +112,28 @@ public interface MessagingService {
      * @param correlationId Correlation id when replying to the request.
      * @return Future of the send operation.
      */
-    CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId);
+    default CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+        return respond(recipientConsistentId, ChannelType.DEFAULT, msg, correlationId);
+    }
 
     /**
-     * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
+     * Sends a response to a {@link #invoke} request via specified channel.
+     * Guarantees are the same as for the {@link #send(ClusterNode, NetworkMessage)}.
+     *
+     * <p>If the recipient cannot be resolved (because it has already left the physical topology), the returned future is resolved
+     * with the corresponding exception ({@link UnresolvableConsistentIdException}).
+     *
+     * @param recipientConsistentId Consistent ID of the recipient of the message.
+     * @param channelType Channel which will be used to message transfer.
+     * @param msg Message which should be delivered.
+     * @param correlationId Correlation id when replying to the request.
+     * @return Future of the send operation.
+     */
+    CompletableFuture<Void> respond(String recipientConsistentId, ChannelType channelType, NetworkMessage msg, long correlationId);
+
+    /**
+     * Sends a message via default channel asynchronously with same guarantees as
+     * {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
      * completed successfully upon receiving a response.
      *
      * @param recipient Recipient of the message.
@@ -88,18 +141,49 @@ public interface MessagingService {
      * @param timeout   Waiting for response timeout in milliseconds.
      * @return A future holding the response or error if the expected response was not received.
      */
-    CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout);
+    default CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
+        return invoke(recipient, ChannelType.DEFAULT, msg, timeout);
+    }
+
+    /**
+     * Sends a message asynchronously  via specified channel with same guarantees as
+     * {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
+     * completed successfully upon receiving a response.
+     *
+     * @param recipient Recipient of the message.
+     * @param channelType Channel which will be used to message transfer.
+     * @param msg Message which should be delivered.
+     * @param timeout Waiting for response timeout in milliseconds.
+     * @return A future holding the response or error if the expected response was not received.
+     */
+    CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, ChannelType channelType, NetworkMessage msg, long timeout);
+
+    /**
+     * Sends a message via default channel asynchronously with same guarantees as
+     * {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
+     * completed successfully upon receiving a response.
+     *
+     * @param recipientConsistentId Consistent ID of the recipient of the message.
+     * @param msg The message.
+     * @param timeout Waiting for response timeout in milliseconds.
+     * @return A future holding the response or error if the expected response was not received.
+     */
+    default CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, NetworkMessage msg, long timeout) {
+        return invoke(recipientConsistentId, ChannelType.DEFAULT, msg, timeout);
+    }
 
     /**
-     * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
+     * Sends a message via specified channel asynchronously with same guarantees as
+     * {@link #send(ClusterNode, NetworkMessage)} and returns a future that will be
      * completed successfully upon receiving a response.
      *
      * @param recipientConsistentId Consistent ID of the recipient of the message.
+     * @param channelType Channel which will be used to message transfer.
      * @param msg The message.
      * @param timeout Waiting for response timeout in milliseconds.
      * @return A future holding the response or error if the expected response was not received.
      */
-    CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, NetworkMessage msg, long timeout);
+    CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, ChannelType channelType, NetworkMessage msg, long timeout);
 
     /**
      * Registers a listener for a group of network message events.
diff --git a/modules/network-api/src/test/java/org/apache/ignite/network/ChannelTypeTest.java b/modules/network-api/src/test/java/org/apache/ignite/network/ChannelTypeTest.java
new file mode 100644
index 0000000000..47058ea4e8
--- /dev/null
+++ b/modules/network-api/src/test/java/org/apache/ignite/network/ChannelTypeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test suite for {@link ChannelType}.
+ */
+public class ChannelTypeTest {
+    private static short OFFSET = Short.MAX_VALUE - 100;
+
+    @Test
+    public void testRegister() {
+        ChannelType testRegister = ChannelType.register(OFFSET, "TestRegister");
+
+        assertThat(testRegister, is(ChannelType.getChannel(OFFSET)));
+        OFFSET++;
+    }
+
+    @Test
+    public void testRegisterAlready() {
+        ChannelType.register(OFFSET, "TestRegister1");
+
+        assertThrows(ChannelTypeAlreadyExist.class, () -> ChannelType.register(OFFSET, "TestRegister2"));
+
+        OFFSET++;
+    }
+
+    @Test
+    public void testRegisterSame() {
+        ChannelType.register(OFFSET, "TestRegister1");
+
+        assertThrows(ChannelTypeAlreadyExist.class, () -> ChannelType.register(OFFSET, "TestRegister1"));
+
+        OFFSET++;
+    }
+
+    @Test
+    public void testGetNotRegistered() {
+        ChannelType channel = ChannelType.getChannel(OFFSET);
+
+        assertThat(channel, nullValue());
+    }
+
+    @Test
+    public void testRegisterNegativeIndex() {
+        assertThrows(IllegalArgumentException.class, () -> ChannelType.register((short) -1, "test"));
+    }
+}
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index d145b7c496..27f5816aa5 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -62,6 +62,7 @@ dependencies {
     testImplementation libs.slf4j.jdk14
     testImplementation libs.bytebuddy
     testImplementation libs.compileTesting
+    testImplementation libs.awaitility
 
     testFixturesAnnotationProcessor project(":ignite-network-annotation-processor")
     testFixturesImplementation project(':ignite-configuration')
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index ea33e7d394..bfd7eace45 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ChannelType;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
@@ -95,22 +96,23 @@ public class ItConnectionManagerTest {
         int port1 = 4000;
         int port2 = 4001;
 
-        ConnectionManagerWrapper manager1 = startManager(port1);
-        ConnectionManagerWrapper manager2 = startManager(port2);
 
-        var fut = new CompletableFuture<NetworkMessage>();
+        try (ConnectionManagerWrapper manager1 = startManager(port1);
+                ConnectionManagerWrapper manager2 = startManager(port2)) {
+            var fut = new CompletableFuture<NetworkMessage>();
 
-        manager2.connectionManager.addListener((obj) -> fut.complete(obj.message()));
+            manager2.connectionManager.addListener((obj) -> fut.complete(obj.message()));
 
-        NettySender sender = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
+            NettySender sender = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
 
-        TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
+            TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
 
-        sender.send(new OutNetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
+            sender.send(new OutNetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
-        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+            NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
-        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+            assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+        }
     }
 
     /**
@@ -127,40 +129,40 @@ public class ItConnectionManagerTest {
         int port1 = 4000;
         int port2 = 4001;
 
-        ConnectionManagerWrapper manager1 = startManager(port1);
-        ConnectionManagerWrapper manager2 = startManager(port2);
+        try (ConnectionManagerWrapper manager1 = startManager(port1);
+                ConnectionManagerWrapper manager2 = startManager(port2)) {
+            var fut = new CompletableFuture<NetworkMessage>();
 
-        var fut = new CompletableFuture<NetworkMessage>();
-
-        manager1.connectionManager.addListener((obj) -> fut.complete(obj.message()));
+            manager1.connectionManager.addListener((obj) -> fut.complete(obj.message()));
 
-        NettySender senderFrom1to2 = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
+            NettySender senderFrom1to2 = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
 
-        // Ensure a handshake has finished on both sides by sending a message.
-        // TODO: IGNITE-16947 When the recovery protocol is implemented replace this with simple
-        // CompletableFuture#get called on the send future.
-        var messageReceivedOn2 = new CompletableFuture<Void>();
+            // Ensure a handshake has finished on both sides by sending a message.
+            // TODO: IGNITE-16947 When the recovery protocol is implemented replace this with simple
+            // CompletableFuture#get called on the send future.
+            var messageReceivedOn2 = new CompletableFuture<Void>();
 
-        // If the message is received, that means that the handshake was successfully performed.
-        manager2.connectionManager.addListener((message) -> messageReceivedOn2.complete(null));
+            // If the message is received, that means that the handshake was successfully performed.
+            manager2.connectionManager.addListener((message) -> messageReceivedOn2.complete(null));
 
-        senderFrom1to2.send(new OutNetworkObject(testMessage, Collections.emptyList()));
+            senderFrom1to2.send(new OutNetworkObject(testMessage, Collections.emptyList()));
 
-        messageReceivedOn2.get(3, TimeUnit.SECONDS);
+            messageReceivedOn2.get(3, TimeUnit.SECONDS);
 
-        NettySender senderFrom2to1 = manager2.openChannelTo(manager1).get(3, TimeUnit.SECONDS);
+            NettySender senderFrom2to1 = manager2.openChannelTo(manager1).get(3, TimeUnit.SECONDS);
 
-        InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
+            InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
 
-        InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
+            InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
 
-        assertEquals(clientLocalAddress, clientRemoteAddress);
+            assertEquals(clientLocalAddress, clientRemoteAddress);
 
-        senderFrom2to1.send(new OutNetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
+            senderFrom2to1.send(new OutNetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
-        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+            NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
-        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+            assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+        }
     }
 
     /**
@@ -250,9 +252,7 @@ public class ItConnectionManagerTest {
     public void testConnectMisconfiguredServer() throws Exception {
         ConnectionManagerWrapper client = startManager(4000);
 
-        ConnectionManagerWrapper server = startManager(4001, mockSerializationRegistry());
-
-        try {
+        try (ConnectionManagerWrapper server = startManager(4001, mockSerializationRegistry())) {
             client.openChannelTo(server).get(3, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             assertThat(e.getCause(), isA(IOException.class));
@@ -266,9 +266,7 @@ public class ItConnectionManagerTest {
     public void testConnectMisconfiguredClient() throws Exception {
         ConnectionManagerWrapper client = startManager(4000, mockSerializationRegistry());
 
-        ConnectionManagerWrapper server = startManager(4001);
-
-        try {
+        try (ConnectionManagerWrapper server = startManager(4001)) {
             client.openChannelTo(server).get(3, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             assertThat(e.getCause(), isA(DecoderException.class));
@@ -376,7 +374,12 @@ public class ItConnectionManagerTest {
         }
 
         OrderingFuture<NettySender> openChannelTo(ConnectionManagerWrapper recipient) {
-            return connectionManager.channel(recipient.connectionManager.consistentId(), recipient.connectionManager.localAddress());
+            return connectionManager.channel(
+                    recipient.connectionManager.consistentId(),
+                    ChannelType.DEFAULT,
+                    recipient.connectionManager.localAddress()
+            );
         }
+
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 18ef33dbfd..66a5ea74b6 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static org.apache.ignite.network.ChannelType.getChannel;
+
 import io.netty.bootstrap.Bootstrap;
 import java.net.InetSocketAddress;
 import java.util.Collection;
@@ -37,12 +39,13 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory;
 import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
 import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ChannelType;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -66,12 +69,11 @@ public class ConnectionManager {
     /** Server. */
     private final NettyServer server;
 
-    // TODO: IGNITE-16948 Should be a map consistentId -> connectionId -> sender
     /** Channels map from consistentId to {@link NettySender}. */
-    private final Map<String, NettySender> channels = new ConcurrentHashMap<>();
+    private final Map<ConnectorKey<String>, NettySender> channels = new ConcurrentHashMap<>();
 
     /** Clients. */
-    private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+    private final Map<ConnectorKey<InetSocketAddress>, NettyClient> clients = new ConcurrentHashMap<>();
 
     /** Serialization service. */
     private final SerializationService serializationService;
@@ -86,7 +88,7 @@ public class ConnectionManager {
     private final UUID launchId;
 
     /** Factory producing {@link RecoveryClientHandshakeManager} instances. */
-    private final RecoveryClientHandhakeManagerFactory clientHandhakeManagerFactory;
+    private final RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory;
 
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
@@ -122,7 +124,7 @@ public class ConnectionManager {
                 launchId,
                 consistentId,
                 bootstrapFactory,
-                new DefaultRecoveryClientHandhakeManagerFactory()
+                new DefaultRecoveryClientHandshakeManagerFactory()
         );
     }
 
@@ -134,7 +136,7 @@ public class ConnectionManager {
      * @param launchId                      Launch id of this node.
      * @param consistentId                  Consistent id of this node.
      * @param bootstrapFactory              Bootstrap factory.
-     * @param clientHandhakeManagerFactory  Factory for {@link RecoveryClientHandshakeManager} instances.
+     * @param clientHandshakeManagerFactory Factory for {@link RecoveryClientHandshakeManager} instances.
      */
     public ConnectionManager(
             NetworkView networkConfiguration,
@@ -142,12 +144,12 @@ public class ConnectionManager {
             UUID launchId,
             String consistentId,
             NettyBootstrapFactory bootstrapFactory,
-            RecoveryClientHandhakeManagerFactory clientHandhakeManagerFactory
+            RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory
     ) {
         this.serializationService = serializationService;
         this.launchId = launchId;
         this.consistentId = consistentId;
-        this.clientHandhakeManagerFactory = clientHandhakeManagerFactory;
+        this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
         this.networkConfiguration = networkConfiguration;
 
         this.server = new NettyServer(
@@ -208,13 +210,13 @@ public class ConnectionManager {
      * @param address      Another node's address.
      * @return Sender.
      */
-    public OrderingFuture<NettySender> channel(@Nullable String consistentId, InetSocketAddress address) {
+    public OrderingFuture<NettySender> channel(@Nullable String consistentId, ChannelType type, InetSocketAddress address) {
         if (consistentId != null) {
             // If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
             // or an inbound connection associated with that consistent id.
             NettySender channel = channels.compute(
-                    consistentId,
-                    (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
+                    new ConnectorKey<>(consistentId, type),
+                    (key, sender) -> (sender == null || !sender.isOpen()) ? null : sender
             );
 
             if (channel != null) {
@@ -225,10 +227,9 @@ public class ConnectionManager {
         // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
         // when the client is ready for write operations, so previously started client, that didn't establish connection
         // or didn't perform the handshake operation, can be reused.
-        // TODO: IGNITE-16948 Connection id may be different from 0
         NettyClient client = clients.compute(
-                address,
-                (addr, existingClient) -> isClientConnected(existingClient) ? existingClient : connect(addr, (short) 0)
+                new ConnectorKey<>(address, type),
+                (key, existingClient) -> isClientConnected(existingClient) ? existingClient : connect(key.id(), key.type())
         );
 
         return client.sender();
@@ -253,7 +254,8 @@ public class ConnectionManager {
      * @param channel Channel from client to this {@link #server}.
      */
     private void onNewIncomingChannel(NettySender channel) {
-        NettySender oldChannel = channels.put(channel.consistentId(), channel);
+        ConnectorKey<String> key = new ConnectorKey<>(channel.consistentId(), getChannel(channel.channelId()));
+        NettySender oldChannel = channels.put(key, channel);
 
         if (oldChannel != null) {
             oldChannel.close();
@@ -266,20 +268,21 @@ public class ConnectionManager {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(InetSocketAddress address, short connectionId) {
+    private NettyClient connect(InetSocketAddress address, ChannelType channelType) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                createClientHandshakeManager(connectionId),
+                createClientHandshakeManager(channelType.id()),
                 this::onMessage,
                 this.networkConfiguration.ssl()
         );
 
         client.start(clientBootstrap).whenComplete((sender, throwable) -> {
             if (throwable == null) {
-                channels.put(sender.consistentId(), sender);
+                ConnectorKey<String> key = new ConnectorKey<>(sender.consistentId(), getChannel(sender.channelId()));
+                channels.put(key, sender);
             } else {
-                clients.remove(address);
+                clients.remove(new ConnectorKey<>(address, channelType));
             }
         });
 
@@ -331,7 +334,7 @@ public class ConnectionManager {
     }
 
     private HandshakeManager createClientHandshakeManager(short connectionId) {
-        return clientHandhakeManagerFactory.create(
+        return clientHandshakeManagerFactory.create(
                 launchId,
                 consistentId,
                 connectionId,
@@ -353,6 +356,11 @@ public class ConnectionManager {
         return server;
     }
 
+    @TestOnly
+    public SerializationService serializationService() {
+        return serializationService;
+    }
+
     /**
      * Returns this node's consistent id.
      *
@@ -372,23 +380,16 @@ public class ConnectionManager {
         return Collections.unmodifiableCollection(clients.values());
     }
 
-    /**
-     * Returns map of the channels.
-     *
-     * @return Map of the channels.
-     */
-    @TestOnly
-    public Map<String, NettySender> channels() {
-        return Collections.unmodifiableMap(channels);
-    }
-
     /**
      * Factory producing vanilla {@link RecoveryClientHandshakeManager} instances.
      */
-    private static class DefaultRecoveryClientHandhakeManagerFactory implements RecoveryClientHandhakeManagerFactory {
+    private static class DefaultRecoveryClientHandshakeManagerFactory implements RecoveryClientHandshakeManagerFactory {
         @Override
-        public RecoveryClientHandshakeManager create(UUID launchId, String consistentId, short connectionId,
-                RecoveryDescriptorProvider recoveryDescriptorProvider) {
+        public RecoveryClientHandshakeManager create(UUID launchId,
+                String consistentId,
+                short connectionId,
+                RecoveryDescriptorProvider recoveryDescriptorProvider
+        ) {
             return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, recoveryDescriptorProvider);
         }
     }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java
new file mode 100644
index 0000000000..9b0f39e4a9
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import org.apache.ignite.network.ChannelType;
+
+/**
+ * Composite key object for connectors.
+ *
+ * @param <T> identifier type.
+ */
+public class ConnectorKey<T> {
+    private final T id;
+
+    private final ChannelType type;
+
+    /**
+     * Constructor.
+     *
+     * @param id Connector identifier.
+     * @param type Channel type.
+     */
+    public ConnectorKey(T id, ChannelType type) {
+        this.id = id;
+        this.type = type;
+    }
+
+    public T id() {
+        return id;
+    }
+
+    public ChannelType type() {
+        return type;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ConnectorKey<?> that = (ConnectorKey<?>) o;
+
+        return (id != null ? id.equals(that.id) : that.id == null)
+                && (type != null ? type.equals(that.type) : that.type == null);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = id != null ? id.hashCode() : 0;
+        result = 31 * result + (type != null ? type.hashCode() : 0);
+        return result;
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 3239e12103..2294646d60 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -39,17 +39,21 @@ public class NettySender {
     /** Consistent id of the remote node. */
     private final String consistentId;
 
+    private final short channelId;
+
     /**
      * Constructor.
      *
-     * @param channel      Netty channel.
-     * @param launchId     Launch id of the remote node.
+     * @param channel Netty channel.
+     * @param launchId Launch id of the remote node.
      * @param consistentId Consistent id of the remote node.
+     * @param channelId channel identifier.
      */
-    public NettySender(Channel channel, String launchId, String consistentId) {
+    public NettySender(Channel channel, String launchId, String consistentId, short channelId) {
         this.channel = channel;
         this.launchId = launchId;
         this.consistentId = consistentId;
+        this.channelId = channelId;
     }
 
     /**
@@ -80,6 +84,15 @@ public class NettySender {
         return consistentId;
     }
 
+    /**
+     * Returns channel identifier.
+     *
+     * @return Channel identifier.
+     */
+    public short channelId() {
+        return channelId;
+    }
+
     /**
      * Closes channel.
      */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 87b2a889e7..8925b19834 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -87,7 +87,9 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
      * @param recoveryDescriptorProvider Recovery descriptor provider.
      */
     public RecoveryClientHandshakeManager(
-            UUID launchId, String consistentId, short connectionId,
+            UUID launchId,
+            String consistentId,
+            short connectionId,
             RecoveryDescriptorProvider recoveryDescriptorProvider) {
         this.launchId = launchId;
         this.consistentId = consistentId;
@@ -112,8 +114,12 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
             this.remoteLaunchId = msg.launchId();
             this.remoteConsistentId = msg.consistentId();
 
-            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId, connectionId,
-                    false);
+            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(
+                    remoteConsistentId,
+                    remoteLaunchId,
+                    connectionId,
+                    false
+            );
 
             handshake(recoveryDescriptor);
 
@@ -199,7 +205,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
         // Removes handshake handler from the pipeline as the handshake is finished
         this.ctx.pipeline().remove(this.handler);
 
-        handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+        handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, connectionId));
     }
 
     @TestOnly
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerFactory.java
similarity index 96%
rename from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
rename to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerFactory.java
index 2ab0dfec29..b4f88559ce 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerFactory.java
@@ -22,7 +22,7 @@ import java.util.UUID;
 /**
  * Factory producing {@link RecoveryClientHandshakeManager} instances.
  */
-public interface RecoveryClientHandhakeManagerFactory {
+public interface RecoveryClientHandshakeManagerFactory {
     /**
      * Produces a {@link RecoveryClientHandshakeManager} instance.
      *
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 3a252b33ef..54501be1d8 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -61,6 +61,8 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
     /** Remote node's consistent id. */
     private String remoteConsistentId;
 
+    private short remoteChannelId;
+
     /** Netty pipeline channel handler context. */
     private ChannelHandlerContext ctx;
 
@@ -88,8 +90,11 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
      * @param recoveryDescriptorProvider Recovery descriptor provider.
      */
     public RecoveryServerHandshakeManager(
-            UUID launchId, String consistentId, NetworkMessagesFactory messageFactory,
-            RecoveryDescriptorProvider recoveryDescriptorProvider) {
+            UUID launchId,
+            String consistentId,
+            NetworkMessagesFactory messageFactory,
+            RecoveryDescriptorProvider recoveryDescriptorProvider
+    ) {
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.messageFactory = messageFactory;
@@ -132,9 +137,13 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
             this.remoteLaunchId = msg.launchId();
             this.remoteConsistentId = msg.consistentId();
             this.receivedCount = msg.receivedCount();
+            this.remoteChannelId = msg.connectionId();
 
-            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId,
-                    msg.connectionId(), true);
+            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(
+                    remoteConsistentId,
+                    remoteLaunchId,
+                    remoteChannelId,
+                    true);
 
             handshake(recoveryDescriptor);
 
@@ -216,7 +225,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
         // Removes handshake handler from the pipeline as the handshake is finished
         this.ctx.pipeline().remove(this.handler);
 
-        handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+        handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, remoteChannelId));
     }
 
     @TestOnly
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 7bde48da3f..fef1ed918e 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
+import org.apache.ignite.internal.future.OrderingFuture;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -127,17 +128,17 @@ public class DefaultMessagingService extends AbstractMessagingService {
     }
 
     @Override
-    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
-        return send0(recipient, msg, null);
+    public CompletableFuture<Void> send(ClusterNode recipient, ChannelType channelType, NetworkMessage msg) {
+        return send0(recipient, channelType, msg, null);
     }
 
     @Override
-    public CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
-        return send0(recipient, msg, correlationId);
+    public CompletableFuture<Void> respond(ClusterNode recipient, ChannelType type, NetworkMessage msg, long correlationId) {
+        return send0(recipient, type, msg, correlationId);
     }
 
     @Override
-    public CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+    public CompletableFuture<Void> respond(String recipientConsistentId, ChannelType type, NetworkMessage msg, long correlationId) {
         ClusterNode recipient = topologyService.getByConsistentId(recipientConsistentId);
 
         if (recipient == null) {
@@ -150,13 +151,13 @@ public class DefaultMessagingService extends AbstractMessagingService {
     }
 
     @Override
-    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
-        return invoke0(recipient, msg, timeout);
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, ChannelType type, NetworkMessage msg, long timeout) {
+        return invoke0(recipient, type, msg, timeout);
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, NetworkMessage msg, long timeout) {
+    public CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, ChannelType type, NetworkMessage msg, long timeout) {
         ClusterNode recipient = topologyService.getByConsistentId(recipientConsistentId);
 
         if (recipient == null) {
@@ -165,7 +166,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
             );
         }
 
-        return invoke0(recipient, msg, timeout);
+        return invoke0(recipient, type, msg, timeout);
     }
 
     /**
@@ -176,7 +177,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
      * @param correlationId Correlation id. Not null iff the message is a response to a {@link #invoke} request.
      * @return Future of the send operation.
      */
-    private CompletableFuture<Void> send0(ClusterNode recipient, NetworkMessage msg, @Nullable Long correlationId) {
+    private CompletableFuture<Void> send0(ClusterNode recipient, ChannelType type, NetworkMessage msg, @Nullable Long correlationId) {
         if (connectionManager.isStopped()) {
             return failedFuture(new NodeStoppingException());
         }
@@ -200,7 +201,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         NetworkMessage message = correlationId != null ? responseFromMessage(msg, correlationId) : msg;
 
-        return sendMessage0(recipient.name(), recipientAddress, message);
+        return sendMessage0(recipient.name(), type, recipientAddress, message);
     }
 
     private boolean shouldDropMessage(ClusterNode recipient, NetworkMessage msg) {
@@ -217,7 +218,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
      * @param timeout Invocation timeout.
      * @return A future holding the response or error if the expected response was not received.
      */
-    private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, NetworkMessage msg, long timeout) {
+    private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, ChannelType type, NetworkMessage msg, long timeout) {
         if (connectionManager.isStopped()) {
             return failedFuture(new NodeStoppingException());
         }
@@ -244,21 +245,27 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         InvokeRequest message = requestFromMessage(msg, correlationId);
 
-        return sendMessage0(recipient.name(), recipientAddress, message).thenCompose(unused -> responseFuture);
+        return sendMessage0(recipient.name(), type, recipientAddress, message).thenCompose(unused -> responseFuture);
     }
 
     /**
      * Sends network object.
      *
      * @param consistentId Target consistent ID. Can be {@code null} if the node has not been added to the topology.
+     * @param type Channel type for send.
      * @param addr Target address.
      * @param message Message.
      *
      * @return Future of the send operation.
      */
-    private CompletableFuture<Void> sendMessage0(@Nullable String consistentId, InetSocketAddress addr, NetworkMessage message) {
+    private CompletableFuture<Void> sendMessage0(
+            @Nullable String consistentId,
+            ChannelType type,
+            InetSocketAddress addr,
+            NetworkMessage message
+    ) {
         if (isInNetworkThread()) {
-            return CompletableFuture.supplyAsync(() -> sendMessage0(consistentId, addr, message), outboundExecutor)
+            return CompletableFuture.supplyAsync(() -> sendMessage0(consistentId, type, addr, message), outboundExecutor)
                     .thenCompose(Function.identity());
         }
 
@@ -270,7 +277,8 @@ public class DefaultMessagingService extends AbstractMessagingService {
             return failedFuture(new IgniteException("Failed to marshal message: " + e.getMessage(), e));
         }
 
-        return connectionManager.channel(consistentId, addr)
+        OrderingFuture<NettySender> channel = connectionManager.channel(consistentId, type, addr);
+        return channel
                 .thenComposeToCompletable(sender -> sender.send(new OutNetworkObject(message, descriptors)));
     }
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index 466278e413..4a9337c38d 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -255,7 +255,7 @@ public class NettyClientTest {
 
         /** Constructor. */
         private MockClientHandshakeManager(Channel channel) {
-            this.sender = new NettySender(channel, "", "");
+            this.sender = new NettySender(channel, "", "", (short) 0);
         }
 
         /** {@inheritDoc} */
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index 5b1754b349..5987dcf41f 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -19,8 +19,10 @@ package org.apache.ignite.network;
 
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.lenient;
@@ -33,16 +35,20 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.messages.AllTypesMessageImpl;
 import org.apache.ignite.internal.network.messages.TestMessage;
+import org.apache.ignite.internal.network.messages.TestMessageImpl;
+import org.apache.ignite.internal.network.messages.TestMessageSerializationFactory;
 import org.apache.ignite.internal.network.messages.TestMessageTypes;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory;
 import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
@@ -51,7 +57,10 @@ import org.apache.ignite.internal.network.serialization.UserObjectSerializationC
 import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
 import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageSerializationFactory;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.serialization.MessageSerializer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -64,6 +73,8 @@ class DefaultMessagingServiceTest {
     private static final int SENDER_PORT = 2001;
     private static final int RECEIVER_PORT = 2002;
 
+    private static final ChannelType TEST_CHANNEL = ChannelType.register(Short.MAX_VALUE, "Test");
+
     @Mock
     private TopologyService topologyService;
 
@@ -133,6 +144,109 @@ class DefaultMessagingServiceTest {
         }
     }
 
+    @Test
+    public void sendMessagesOneChannel() throws Exception {
+        AtomicBoolean release = new AtomicBoolean(false);
+        MessageSerializer<TestMessage> serializer = new TestMessageSerializationFactory(
+                new TestMessagesFactory()).createSerializer();
+        Serializer longWaitSerializer = new Serializer(TestMessageImpl.GROUP_TYPE, TestMessageImpl.TYPE,
+                (message, writer) -> release.get()
+                        && serializer.writeMessage((TestMessage) message, writer));
+        try (Services services = createMessagingService(
+                senderNode,
+                senderNetworkConfig,
+                () -> {},
+                mockSerializationRegistry(longWaitSerializer));
+                Services receiverServices = createMessagingService(receiverNode, receiverNetworkConfig, () -> {})
+        ) {
+            CountDownLatch latch = new CountDownLatch(2);
+            receiverServices.messagingService.addMessageHandler(
+                    TestMessageTypes.class,
+                    (message, sender, correlationId) -> latch.countDown()
+            );
+
+            services.messagingService.send(receiverNode, TestMessageImpl.builder().build());
+            services.messagingService.send(receiverNode, AllTypesMessageImpl.builder().build());
+
+            assertThat(latch.getCount(), is(2L));
+            release.set(true);
+            assertTrue(latch.await(1, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void sendMessagesTwoChannels() throws Exception {
+        AtomicBoolean release = new AtomicBoolean(false);
+        MessageSerializer<TestMessage> serializer = new TestMessageSerializationFactory(
+                new TestMessagesFactory()).createSerializer();
+        Serializer longWaitSerializer = new Serializer(TestMessageImpl.GROUP_TYPE, TestMessageImpl.TYPE,
+                (message, writer) -> release.get()
+                        && serializer.writeMessage((TestMessage) message, writer));
+        try (Services services = createMessagingService(
+                senderNode,
+                senderNetworkConfig,
+                () -> {},
+                mockSerializationRegistry(longWaitSerializer));
+                Services receiverServices = createMessagingService(receiverNode, receiverNetworkConfig, () -> {})
+        ) {
+            CountDownLatch latch = new CountDownLatch(2);
+            receiverServices.messagingService.addMessageHandler(
+                    TestMessageTypes.class,
+                    (message, sender, correlationId) -> latch.countDown()
+            );
+
+            services.messagingService.send(receiverNode, TestMessageImpl.builder().build());
+            services.messagingService.send(receiverNode, TEST_CHANNEL, AllTypesMessageImpl.builder().build());
+
+            await().timeout(1, TimeUnit.SECONDS)
+                    .until(() -> latch.getCount() == 1);
+
+            release.set(true);
+            assertTrue(latch.await(1, TimeUnit.SECONDS));
+        }
+    }
+
+    private static MessageSerializationRegistry mockSerializationRegistry(Serializer... serializers) {
+        MessageSerializationRegistry defaultRegistry = defaultSerializationRegistry();
+
+        MessageSerializationRegistry wrapper = new MessageSerializationRegistry() {
+            @Override
+            public MessageSerializationRegistry registerFactory(short groupType, short messageType,
+                    MessageSerializationFactory<?> factory) {
+                return this;
+            }
+
+            @Override
+            public <T extends NetworkMessage> MessageSerializer<T> createSerializer(short groupType, short messageType) {
+                for (Serializer serializer : serializers) {
+                    if (serializer.groupType == groupType && serializer.messageType == messageType) {
+                        return (MessageSerializer<T>) serializer.serializer;
+                    }
+                }
+                return defaultRegistry.createSerializer(groupType, messageType);
+            }
+
+            @Override
+            public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short groupType, short messageType) {
+                return defaultRegistry.createDeserializer(groupType, messageType);
+            }
+        };
+
+        return wrapper;
+    }
+
+    private static class Serializer {
+        private final short groupType;
+        private final short messageType;
+        private final MessageSerializer<? extends NetworkMessage> serializer;
+
+        private Serializer(short groupType, short messageType, MessageSerializer<? extends NetworkMessage> serializer) {
+            this.groupType = groupType;
+            this.messageType = messageType;
+            this.serializer = serializer;
+        }
+    }
+
     private static void awaitQuietly(CountDownLatch latch) {
         try {
             latch.await();
@@ -146,6 +260,15 @@ class DefaultMessagingServiceTest {
     }
 
     private Services createMessagingService(ClusterNode node, NetworkConfiguration networkConfig, Runnable beforeHandshake) {
+        return createMessagingService(node, networkConfig, beforeHandshake, messageSerializationRegistry);
+    }
+
+    private Services createMessagingService(
+            ClusterNode node,
+            NetworkConfiguration networkConfig,
+            Runnable beforeHandshake,
+            MessageSerializationRegistry registry
+    ) {
         ClassDescriptorRegistry classDescriptorRegistry = new ClassDescriptorRegistry();
         ClassDescriptorFactory classDescriptorFactory = new ClassDescriptorFactory(classDescriptorRegistry);
         UserObjectMarshaller marshaller = new DefaultUserObjectMarshaller(classDescriptorRegistry, classDescriptorFactory);
@@ -158,7 +281,7 @@ class DefaultMessagingServiceTest {
         );
 
         SerializationService serializationService = new SerializationService(
-                messageSerializationRegistry,
+                registry,
                 new UserObjectSerializationContext(classDescriptorRegistry, classDescriptorFactory, marshaller)
         );
 
@@ -182,10 +305,13 @@ class DefaultMessagingServiceTest {
         return new Services(connectionManager, messagingService);
     }
 
-    private static RecoveryClientHandhakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) {
-        return new RecoveryClientHandhakeManagerFactory() {
+    private static RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) {
+        return new RecoveryClientHandshakeManagerFactory() {
             @Override
-            public RecoveryClientHandshakeManager create(UUID launchId, String consistentId, short connectionId,
+            public RecoveryClientHandshakeManager create(
+                    UUID launchId,
+                    String consistentId,
+                    short connectionId,
                     RecoveryDescriptorProvider recoveryDescriptorProvider) {
                 return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, recoveryDescriptorProvider) {
                     @Override
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index 10e217ac2a..4e6b0caaed 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -28,6 +28,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.network.AbstractMessagingService;
 import org.apache.ignite.network.AbstractTopologyService;
+import org.apache.ignite.network.ChannelType;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -171,7 +172,7 @@ public class ClusterServiceFactory {
 
         /** {@inheritDoc} */
         @Override
-        public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
+        public CompletableFuture<Void> send(ClusterNode recipient, ChannelType channelType, NetworkMessage msg) {
             for (var handler : messagingServicesByNode.get(recipient.name()).messageHandlers(msg.groupType())) {
                 handler.onReceived(msg, localNodeName, null);
             }
@@ -181,25 +182,25 @@ public class ClusterServiceFactory {
 
         /** {@inheritDoc} */
         @Override
-        public CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
+        public CompletableFuture<Void> respond(ClusterNode recipient, ChannelType type, NetworkMessage msg, long correlationId) {
             throw new AssertionError("Not implemented yet");
         }
 
         /** {@inheritDoc} */
         @Override
-        public CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+        public CompletableFuture<Void> respond(String recipientConsistentId, ChannelType type, NetworkMessage msg, long correlationId) {
             throw new AssertionError("Not implemented yet");
         }
 
         /** {@inheritDoc} */
         @Override
-        public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
+        public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, ChannelType type, NetworkMessage msg, long timeout) {
             throw new AssertionError("Not implemented yet");
         }
 
         /** {@inheritDoc} */
         @Override
-        public CompletableFuture<NetworkMessage> invoke(String recipientNodeId, NetworkMessage msg, long timeout) {
+        public CompletableFuture<NetworkMessage> invoke(String recipientNodeId, ChannelType type, NetworkMessage msg, long timeout) {
             throw new AssertionError("Not implemented yet");
         }