You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2024/02/01 08:41:07 UTC

(ignite-3) branch main updated: IGNITE-21414 Use the same event loop for channels of the same logical connection (#3135)

This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f96035f0fb IGNITE-21414 Use the same event loop for channels of the same logical connection (#3135)
f96035f0fb is described below

commit f96035f0fb65dd67c67a0029ff9a9ca4c18de439
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Feb 1 12:41:02 2024 +0400

    IGNITE-21414 Use the same event loop for channels of the same logical connection (#3135)
---
 .../network/netty/ChannelEventLoopsSource.java     |  33 ++++
 .../internal/network/netty/ConnectionManager.java  |   5 +
 .../network/recovery/HandshakeManagerUtils.java    |  47 +++--
 .../recovery/RecoveryClientHandshakeManager.java   |   7 +-
 .../recovery/RecoveryServerHandshakeManager.java   |   7 +-
 .../ignite/network/NettyBootstrapFactory.java      |  39 +++-
 .../ignite/network/NettyWorkersRegistrar.java      |   8 +-
 .../network/netty/RecoveryHandshakeTest.java       | 211 ++++++++++++++-------
 .../RecoveryClientHandshakeManagerTest.java        |   7 +-
 .../RecoveryServerHandshakeManagerTest.java        |   7 +-
 .../network/DefaultMessagingServiceTest.java       |   8 +-
 11 files changed, 267 insertions(+), 112 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
new file mode 100644
index 0000000000..a499eb5843
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.EventLoop;
+import java.util.List;
+
+/**
+ * Allows to obtain a list of all event loops with which {@link io.netty.channel.Channel}s might be registered.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ChannelEventLoopsSource {
+    /**
+     * Returns list of all event loops with which {@link io.netty.channel.Channel}s might be registered.
+     * This must always return the same event loops in the same order.
+     */
+    List<EventLoop> channelEventLoops();
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index e727f30eae..557c14e401 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -107,6 +107,8 @@ public class ConnectionManager implements ChannelCreationListener {
     /** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
     private final UUID launchId;
 
+    private final NettyBootstrapFactory bootstrapFactory;
+
     /** Used to detect that a peer uses a stale ID. */
     private final StaleIdDetector staleIdDetector;
 
@@ -182,6 +184,7 @@ public class ConnectionManager implements ChannelCreationListener {
         this.serializationService = serializationService;
         this.launchId = launchId;
         this.consistentId = consistentId;
+        this.bootstrapFactory = bootstrapFactory;
         this.staleIdDetector = staleIdDetector;
         this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
         this.networkConfiguration = networkConfiguration;
@@ -481,6 +484,7 @@ public class ConnectionManager implements ChannelCreationListener {
                     consistentId,
                     connectionId,
                     descriptorProvider,
+                    bootstrapFactory,
                     staleIdDetector,
                     this,
                     stopping::get
@@ -501,6 +505,7 @@ public class ConnectionManager implements ChannelCreationListener {
                 consistentId,
                 FACTORY,
                 descriptorProvider,
+                bootstrapFactory,
                 staleIdDetector,
                 this,
                 stopping::get
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index 6e790dd419..7f54fbe2e1 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -22,13 +22,13 @@ import static java.util.Collections.emptyList;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.EventExecutor;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
 import org.apache.ignite.internal.network.netty.ChannelKey;
 import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.internal.network.netty.NettyUtils;
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionRea
 import org.apache.ignite.network.OutNetworkObject;
 
 class HandshakeManagerUtils {
+    private static final IgniteLogger LOG = Loggers.forClass(HandshakeManagerUtils.class);
+
     private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
 
     static void sendRejectionMessageAndFailHandshake(
@@ -76,14 +78,36 @@ class HandshakeManagerUtils {
      *
      * @param channel Channel to move.
      * @param channelKey Key of the logical connection.
+     * @param eventLoopsSource Used to get all event loops with which a channel might be registered.
      * @param afterSwitching Action to execute after switching (it will be executed on the new event loop).
      */
-    static void switchEventLoopIfNeeded(Channel channel, ChannelKey channelKey, Runnable afterSwitching) {
-        EventLoop targetEventLoop = eventLoopForKey(channelKey, channel);
+    static void switchEventLoopIfNeeded(
+            Channel channel,
+            ChannelKey channelKey,
+            ChannelEventLoopsSource eventLoopsSource,
+            Runnable afterSwitching
+    ) {
+        EventLoop targetEventLoop = eventLoopForKey(channelKey, eventLoopsSource);
 
         if (targetEventLoop != channel.eventLoop()) {
             channel.deregister().addListener(deregistrationFuture -> {
+                if (!deregistrationFuture.isSuccess()) {
+                    LOG.error("Cannot deregister a channel from an event loop", deregistrationFuture.cause());
+
+                    channel.close();
+
+                    return;
+                }
+
                 targetEventLoop.register(channel).addListener(registrationFuture -> {
+                    if (!registrationFuture.isSuccess()) {
+                        LOG.error("Cannot register a channel with an event loop", registrationFuture.cause());
+
+                        channel.close();
+
+                        return;
+                    }
+
                     afterSwitching.run();
                 });
             });
@@ -92,17 +116,8 @@ class HandshakeManagerUtils {
         }
     }
 
-    private static EventLoop eventLoopForKey(ChannelKey channelKey, Channel channel) {
-        EventLoopGroup group = channel.eventLoop().parent();
-        if (group == null) {
-            // EmbeddedEventLoop#parent() returns null, handle this.
-            return channel.eventLoop();
-        }
-
-        List<EventLoop> eventLoops = new ArrayList<>();
-        for (EventExecutor childExecutor : group) {
-            eventLoops.add((EventLoop) childExecutor);
-        }
+    private static EventLoop eventLoopForKey(ChannelKey channelKey, ChannelEventLoopsSource eventLoopsSource) {
+        List<EventLoop> eventLoops = eventLoopsSource.channelEventLoops();
 
         int index = (channelKey.hashCode() & Integer.MAX_VALUE) % eventLoops.size();
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 6247f94100..3c79607534 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsExceptio
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.netty.ChannelCreationListener;
+import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
 import org.apache.ignite.internal.network.netty.ChannelKey;
 import org.apache.ignite.internal.network.netty.HandshakeHandler;
 import org.apache.ignite.internal.network.netty.MessageHandler;
@@ -72,6 +73,8 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
     /** Recovery descriptor provider. */
     private final RecoveryDescriptorProvider recoveryDescriptorProvider;
 
+    private final ChannelEventLoopsSource channelEventLoopsSource;
+
     /** Used to detect that a peer uses a stale ID. */
     private final StaleIdDetector staleIdDetector;
 
@@ -122,6 +125,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
             String consistentId,
             short connectionId,
             RecoveryDescriptorProvider recoveryDescriptorProvider,
+            ChannelEventLoopsSource channelEventLoopsSource,
             StaleIdDetector staleIdDetector,
             ChannelCreationListener channelCreationListener,
             BooleanSupplier stopping
@@ -130,6 +134,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
         this.consistentId = consistentId;
         this.connectionId = connectionId;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
+        this.channelEventLoopsSource = channelEventLoopsSource;
         this.staleIdDetector = staleIdDetector;
         this.stopping = stopping;
 
@@ -230,7 +235,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
         this.remoteConsistentId = handshakeStartMessage.consistentId();
 
         ChannelKey channelKey = new ChannelKey(remoteConsistentId, remoteLaunchId, connectionId);
-        switchEventLoopIfNeeded(channel, channelKey, () -> proceedAfterSavingIds(handshakeStartMessage));
+        switchEventLoopIfNeeded(channel, channelKey, channelEventLoopsSource, () -> proceedAfterSavingIds(handshakeStartMessage));
     }
 
     private void proceedAfterSavingIds(HandshakeStartMessage handshakeStartMessage) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index e38bee24f9..fb227db5be 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.netty.ChannelCreationListener;
+import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
 import org.apache.ignite.internal.network.netty.ChannelKey;
 import org.apache.ignite.internal.network.netty.HandshakeHandler;
 import org.apache.ignite.internal.network.netty.MessageHandler;
@@ -96,6 +97,8 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
     /** Recovery descriptor provider. */
     private final RecoveryDescriptorProvider recoveryDescriptorProvider;
 
+    private final ChannelEventLoopsSource channelEventLoopsSource;
+
     /** Used to detect that a peer uses a stale ID. */
     private final StaleIdDetector staleIdDetector;
 
@@ -120,6 +123,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
             String consistentId,
             NetworkMessagesFactory messageFactory,
             RecoveryDescriptorProvider recoveryDescriptorProvider,
+            ChannelEventLoopsSource channelEventLoopsSource,
             StaleIdDetector staleIdDetector,
             ChannelCreationListener channelCreationListener,
             BooleanSupplier stopping
@@ -128,6 +132,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
         this.consistentId = consistentId;
         this.messageFactory = messageFactory;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
+        this.channelEventLoopsSource = channelEventLoopsSource;
         this.staleIdDetector = staleIdDetector;
         this.stopping = stopping;
 
@@ -218,7 +223,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
         this.remoteChannelId = message.connectionId();
 
         ChannelKey channelKey = new ChannelKey(remoteConsistentId, remoteLaunchId, remoteChannelId);
-        switchEventLoopIfNeeded(channel, channelKey, () -> tryAcquireDescriptorAndFinishHandshake(message));
+        switchEventLoopIfNeeded(channel, channelKey, channelEventLoopsSource, () -> tryAcquireDescriptorAndFinishHandshake(message));
     }
 
     private boolean possiblyRejectHandshakeStartResponse(HandshakeStartResponseMessage message) {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 7c8d168629..c270bfe251 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -23,10 +23,12 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.EventExecutor;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -34,6 +36,7 @@ import org.apache.ignite.internal.network.configuration.InboundView;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.configuration.OutboundView;
+import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
 import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup;
 import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup.NetworkThread;
 import org.jetbrains.annotations.TestOnly;
@@ -41,7 +44,7 @@ import org.jetbrains.annotations.TestOnly;
 /**
  * Netty bootstrap factory. Holds shared {@link EventLoopGroup} instances and encapsulates common Netty {@link Bootstrap} creation logic.
  */
-public class NettyBootstrapFactory implements IgniteComponent {
+public class NettyBootstrapFactory implements IgniteComponent, ChannelEventLoopsSource {
     /** Network configuration. */
     private final NetworkConfiguration networkConfiguration;
 
@@ -49,13 +52,16 @@ public class NettyBootstrapFactory implements IgniteComponent {
     private final String eventLoopGroupNamePrefix;
 
     /** Server boss socket channel handler event loop group. */
-    private NioEventLoopGroup bossGroup;
+    private EventLoopGroup bossGroup;
 
     /** Server work socket channel handler event loop group. */
-    private NioEventLoopGroup workerGroup;
+    private EventLoopGroup workerGroup;
 
     /** Client socket channel handler event loop group. */
-    private NioEventLoopGroup clientWorkerGroup;
+    private EventLoopGroup clientWorkerGroup;
+
+    /** All event loops with which {@link io.netty.channel.Channel}s might be registered. */
+    private volatile List<EventLoop> channelEventLoops;
 
     /**
      * Constructor.
@@ -138,7 +144,7 @@ public class NettyBootstrapFactory implements IgniteComponent {
     /**
      * Returns all event loop groups managed by this factory.
      */
-    List<NioEventLoopGroup> eventLoopGroups() {
+    List<EventLoopGroup> eventLoopGroups() {
         return List.of(bossGroup, workerGroup, clientWorkerGroup);
     }
 
@@ -149,9 +155,23 @@ public class NettyBootstrapFactory implements IgniteComponent {
         workerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-srv-worker");
         clientWorkerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-client");
 
+        this.channelEventLoops = List.copyOf(eventLoopsAt(workerGroup, clientWorkerGroup));
+
         return nullCompletedFuture();
     }
 
+    private static List<EventLoop> eventLoopsAt(EventLoopGroup ... groups) {
+        List<EventLoop> channelEventLoops = new ArrayList<>();
+
+        for (EventLoopGroup group : groups) {
+            for (EventExecutor child : group) {
+                channelEventLoops.add((EventLoop) child);
+            }
+        }
+
+        return channelEventLoops;
+    }
+
     /**
      * Returns {@code true} if the current thread is a network thread, {@code false} otherwise.
      *
@@ -176,11 +196,16 @@ public class NettyBootstrapFactory implements IgniteComponent {
     }
 
 
+    @Override
+    public List<EventLoop> channelEventLoops() {
+        return channelEventLoops;
+    }
+
     /**
      * Returns worker event loop group.
      */
     @TestOnly
-    public NioEventLoopGroup serverEventLoopGroup() {
+    public EventLoopGroup serverEventLoopGroup() {
         return workerGroup;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyWorkersRegistrar.java b/modules/network/src/main/java/org/apache/ignite/network/NettyWorkersRegistrar.java
index cf60d08d39..a7e0a82089 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyWorkersRegistrar.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyWorkersRegistrar.java
@@ -20,7 +20,7 @@ package org.apache.ignite.network;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.SingleThreadEventExecutor;
 import java.util.ArrayList;
@@ -83,7 +83,7 @@ public class NettyWorkersRegistrar implements IgniteComponent {
     @Override
     public CompletableFuture<Void> start() {
         List<NettyWorker> nettyWorkers = new ArrayList<>();
-        for (NioEventLoopGroup group : bootstrapFactory.eventLoopGroups()) {
+        for (EventLoopGroup group : bootstrapFactory.eventLoopGroups()) {
             registerWorkersFor(group, nettyWorkers);
         }
         workers = List.copyOf(nettyWorkers);
@@ -94,8 +94,8 @@ public class NettyWorkersRegistrar implements IgniteComponent {
         return nullCompletedFuture();
     }
 
-    private void registerWorkersFor(NioEventLoopGroup group, List<NettyWorker> nettyWorkers) {
-        List<NettyWorker> groupWorkers = new ArrayList<>(group.executorCount());
+    private void registerWorkersFor(EventLoopGroup group, List<NettyWorker> nettyWorkers) {
+        List<NettyWorker> groupWorkers = new ArrayList<>();
 
         for (EventExecutor eventExecutor : group) {
             SingleThreadEventExecutor executor = (SingleThreadEventExecutor) eventExecutor;
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index a5eb3c2dad..4d1376345f 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -26,8 +26,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 import io.netty.channel.embedded.EmbeddedChannel;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -81,12 +83,14 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
 
-        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
-        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+        EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+        EmbeddedChannel serverSideChannel = createUnregisteredChannel();
 
-        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientSideChannel, clientRecovery);
+        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverSideChannel, serverRecovery);
 
-        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+        setupChannel(clientSideChannel, clientHandshakeManager, noMessageListener);
+        setupChannel(serverSideChannel, serverHandshakeManager, noMessageListener);
 
         assertTrue(serverSideChannel.isActive());
 
@@ -112,16 +116,23 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
 
+        EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+        EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
         UUID clientLaunchId = UUID.randomUUID();
         RecoveryDescriptor serverRecoveryDescriptor = serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID);
         addUnacknowledgedMessages(serverRecoveryDescriptor);
 
-        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager("client", clientLaunchId,
-                clientRecovery);
-        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(
+                clientSideChannel,
+                "client",
+                clientLaunchId,
+                clientRecovery
+        );
+        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverSideChannel, serverRecovery);
 
         var messageCaptor = new AtomicReference<TestMessage>();
-        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, (inObject) -> {
+        setupChannel(clientSideChannel, clientHandshakeManager, (inObject) -> {
             NetworkMessage msg = inObject.message();
 
             assertInstanceOf(TestMessage.class, msg);
@@ -129,7 +140,7 @@ public class RecoveryHandshakeTest {
             messageCaptor.set((TestMessage) msg);
         });
 
-        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+        setupChannel(serverSideChannel, serverHandshakeManager, noMessageListener);
 
         assertTrue(serverSideChannel.isActive());
 
@@ -165,18 +176,25 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
 
+        EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+        EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
         UUID serverLaunchId = UUID.randomUUID();
         RecoveryDescriptor clientRecoveryDescriptor = clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID);
         addUnacknowledgedMessages(clientRecoveryDescriptor);
 
-        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
-        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager("server", serverLaunchId,
-                serverRecovery);
+        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientSideChannel, clientRecovery);
+        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(
+                serverSideChannel,
+                "server",
+                serverLaunchId,
+                serverRecovery
+        );
 
         var messageCaptor = new AtomicReference<TestMessage>();
-        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+        setupChannel(clientSideChannel, clientHandshakeManager, noMessageListener);
 
-        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, (inObject) -> {
+        setupChannel(serverSideChannel, serverHandshakeManager, (inObject) -> {
             NetworkMessage msg = inObject.message();
 
             assertInstanceOf(TestMessage.class, msg);
@@ -218,25 +236,30 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider();
 
+        EmbeddedChannel channel1Src = createUnregisteredChannel();
+        EmbeddedChannel channel1Dst = createUnregisteredChannel();
+        EmbeddedChannel channel2Src = createUnregisteredChannel();
+        EmbeddedChannel channel2Dst = createUnregisteredChannel();
+
         UUID node1Uuid = LOWER_UUID;
         UUID node2Uuid = HIGHER_UUID;
 
-        RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
-        RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+        RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager(channel1Src, "client", node1Uuid, node1Recovery);
+        RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager(channel2Dst, "client", node1Uuid, node1Recovery);
 
-        RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
-        RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+        RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager(channel2Src, "server", node2Uuid, node2Recovery);
+        RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager(channel1Dst, "server", node2Uuid, node2Recovery);
 
         // Channel opened from node1 to node2 is channel 1.
         // Channel opened from node2 to node1 is channel 2.
 
         // Channel 1.
-        EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
-        EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
+        setupChannel(channel1Src, chm1, noMessageListener);
+        setupChannel(channel1Dst, shm2, noMessageListener);
 
         // Channel 2.
-        EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
-        EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
+        setupChannel(channel2Src, chm2, noMessageListener);
+        setupChannel(channel2Dst, shm1, noMessageListener);
 
         exchangeServerToClient(channel2Dst, channel2Src);
         exchangeServerToClient(channel1Dst, channel1Src);
@@ -273,25 +296,30 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider();
 
+        EmbeddedChannel channel1Src = createUnregisteredChannel();
+        EmbeddedChannel channel1Dst = createUnregisteredChannel();
+        EmbeddedChannel channel2Src = createUnregisteredChannel();
+        EmbeddedChannel channel2Dst = createUnregisteredChannel();
+
         UUID node1Uuid = node1LaunchIdIsLower ? LOWER_UUID : HIGHER_UUID;
         UUID node2Uuid = node1LaunchIdIsLower ? HIGHER_UUID : LOWER_UUID;
 
-        RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
-        RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+        RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager(channel1Src, "client", node1Uuid, node1Recovery);
+        RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager(channel2Dst, "client", node1Uuid, node1Recovery);
 
-        RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
-        RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+        RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager(channel2Src, "server", node2Uuid, node2Recovery);
+        RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager(channel1Dst, "server", node2Uuid, node2Recovery);
 
         // Channel opened from node1 to node2 is channel 1.
         // Channel opened from node2 to node1 is channel 2.
 
         // Channel 1.
-        EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
-        EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
+        setupChannel(channel1Src, chm1, noMessageListener);
+        setupChannel(channel1Dst, shm2, noMessageListener);
 
         // Channel 2.
-        EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
-        EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
+        setupChannel(channel2Src, chm2, noMessageListener);
+        setupChannel(channel2Dst, shm1, noMessageListener);
 
         // Channel 2's handshake acquires both locks.
         exchangeServerToClient(channel2Dst, channel2Src);
@@ -339,17 +367,28 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
 
-        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId,
-                clientRecovery);
-        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId,
-                serverRecovery);
+        EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+        EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
+        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(
+                clientSideChannel,
+                client,
+                clientLaunchId,
+                clientRecovery
+        );
+        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(
+                serverSideChannel,
+                server,
+                serverLaunchId,
+                serverRecovery
+        );
 
         var receivedFirst = new AtomicBoolean();
 
         var listener1 = new MessageListener("1", receivedFirst);
 
-        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener1 : noMessageListener);
-        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ?  noMessageListener : listener1);
+        setupChannel(clientSideChannel, clientHandshakeManager, serverDidntReceiveAck ? listener1 : noMessageListener);
+        setupChannel(serverSideChannel, serverHandshakeManager, serverDidntReceiveAck ?  noMessageListener : listener1);
 
         // Normal handshake
         exchangeServerToClient(serverSideChannel, clientSideChannel);
@@ -380,8 +419,8 @@ public class RecoveryHandshakeTest {
         }
 
         // Simulate reconnection
-        clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
-        serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+        clientHandshakeManager = createRecoveryClientHandshakeManager(clientSideChannel, client, clientLaunchId, clientRecovery);
+        serverHandshakeManager = createRecoveryServerHandshakeManager(serverSideChannel, server, serverLaunchId, serverRecovery);
 
         var receivedSecond = new AtomicBoolean();
 
@@ -390,8 +429,11 @@ public class RecoveryHandshakeTest {
         clientSideChannel.finishAndReleaseAll();
         serverSideChannel.finishAndReleaseAll();
 
-        clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
-        serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
+        clientSideChannel = createUnregisteredChannel();
+        serverSideChannel = createUnregisteredChannel();
+
+        setupChannel(clientSideChannel, clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
+        setupChannel(serverSideChannel, serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
 
         // Handshake
         exchangeServerToClient(serverSideChannel, clientSideChannel);
@@ -426,17 +468,20 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
 
-        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+        EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+        EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
+        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientSideChannel, clientRecovery);
         RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(
+                serverSideChannel,
                 "server",
                 UUID.randomUUID(),
                 serverRecovery,
                 new AllIdsAreStale()
         );
 
-        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
-
-        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+        setupChannel(clientSideChannel, clientHandshakeManager, noMessageListener);
+        setupChannel(serverSideChannel, serverHandshakeManager, noMessageListener);
 
         assertTrue(serverSideChannel.isActive());
 
@@ -462,17 +507,20 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
 
+        EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+        EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
         RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(
+                clientSideChannel,
                 "client",
                 UUID.randomUUID(),
                 clientRecovery,
                 new AllIdsAreStale()
         );
-        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
-
-        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverSideChannel, serverRecovery);
 
-        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+        setupChannel(clientSideChannel, clientHandshakeManager, noMessageListener);
+        setupChannel(serverSideChannel, serverHandshakeManager, noMessageListener);
 
         assertTrue(serverSideChannel.isActive());
 
@@ -581,19 +629,20 @@ public class RecoveryHandshakeTest {
     private final Consumer<InNetworkObject> noMessageListener = inNetworkObject ->
             fail("Received message while shouldn't have, [" + inNetworkObject.message() + "]");
 
-    private EmbeddedChannel setupChannel(HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) throws Exception {
-        // Channel should not be registered at first, not before we add pipeline handlers
-        // Otherwise, events like "channel active" won't be propagated to the handlers
-        var channel = new EmbeddedChannel(false, false);
-
+    private void setupChannel(EmbeddedChannel channel, HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener)
+            throws Exception {
         var serializationService = new SerializationService(MESSAGE_REGISTRY, createUserObjectSerializationContext());
         var sessionSerializationService = new PerSessionSerializationService(serializationService);
 
         PipelineUtils.setup(channel.pipeline(), sessionSerializationService, handshakeManager, messageListener);
 
         channel.register();
+    }
 
-        return channel;
+    private static EmbeddedChannel createUnregisteredChannel() {
+        // Channel should not be registered at first, not before we add pipeline handlers
+        // Otherwise, events like "channel active" won't be propagated to the handlers
+        return new EmbeddedChannel(false, false);
     }
 
     private UserObjectSerializationContext createUserObjectSerializationContext() {
@@ -606,44 +655,70 @@ public class RecoveryHandshakeTest {
                 userObjectMarshaller);
     }
 
-    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(RecoveryDescriptorProvider provider) {
-        return createRecoveryClientHandshakeManager("client", UUID.randomUUID(), provider);
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(
+            Channel clientSideChannel,
+            RecoveryDescriptorProvider provider
+    ) {
+        return createRecoveryClientHandshakeManager(clientSideChannel, "client", UUID.randomUUID(), provider);
     }
 
-    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
-            RecoveryDescriptorProvider provider) {
-        return createRecoveryClientHandshakeManager(consistentId, launchId, provider, new AllIdsAreFresh());
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(
+            Channel clientSideChannel,
+            String consistentId,
+            UUID launchId,
+            RecoveryDescriptorProvider provider
+    ) {
+        return createRecoveryClientHandshakeManager(clientSideChannel, consistentId, launchId, provider, new AllIdsAreFresh());
     }
 
-    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
-            RecoveryDescriptorProvider provider, StaleIdDetector staleIdDetector) {
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(
+            Channel clientSideChannel,
+            String consistentId,
+            UUID launchId,
+            RecoveryDescriptorProvider provider,
+            StaleIdDetector staleIdDetector
+    ) {
         return new RecoveryClientHandshakeManager(
                 launchId,
                 consistentId,
                 CONNECTION_ID,
                 provider,
+                () -> List.of(clientSideChannel.eventLoop()),
                 staleIdDetector,
                 channel -> {},
                 () -> false
         );
     }
 
-    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
-        return createRecoveryServerHandshakeManager("server", UUID.randomUUID(), provider);
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(
+            Channel serverSideChannel,
+            RecoveryDescriptorProvider provider
+    ) {
+        return createRecoveryServerHandshakeManager(serverSideChannel, "server", UUID.randomUUID(), provider);
     }
 
-    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
-            RecoveryDescriptorProvider provider) {
-        return createRecoveryServerHandshakeManager(consistentId, launchId, provider, new AllIdsAreFresh());
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(
+            Channel serverSideChannel,
+            String consistentId,
+            UUID launchId,
+            RecoveryDescriptorProvider provider
+    ) {
+        return createRecoveryServerHandshakeManager(serverSideChannel, consistentId, launchId, provider, new AllIdsAreFresh());
     }
 
-    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
-            RecoveryDescriptorProvider provider, StaleIdDetector staleIdDetector) {
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(
+            Channel serverSideChannel,
+            String consistentId,
+            UUID launchId,
+            RecoveryDescriptorProvider provider,
+            StaleIdDetector staleIdDetector
+    ) {
         return new RecoveryServerHandshakeManager(
                 launchId,
                 consistentId,
                 MESSAGE_FACTORY,
                 provider,
+                () -> List.of(serverSideChannel.eventLoop()),
                 staleIdDetector,
                 channel -> {},
                 () -> false
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
index 0edb72fbc7..c7afffd61d 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
@@ -40,7 +40,6 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelProgressivePromise;
 import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.EventExecutor;
 import java.util.List;
 import java.util.UUID;
@@ -105,9 +104,6 @@ class RecoveryClientHandshakeManagerTest extends BaseIgniteAbstractTest {
     @Mock
     private EventLoop eventLoop;
 
-    @Mock
-    private EventLoopGroup eventLoopGroup;
-
     @Mock
     private NettySender competitorNettySender;
 
@@ -127,8 +123,6 @@ class RecoveryClientHandshakeManagerTest extends BaseIgniteAbstractTest {
         lenient().when(eventExecutor.inEventLoop()).thenReturn(true);
 
         lenient().when(thisChannel.eventLoop()).thenReturn(eventLoop);
-        lenient().when(eventLoop.parent()).thenReturn(eventLoopGroup);
-        lenient().when(eventLoopGroup.iterator()).thenReturn(List.of((EventExecutor) eventLoop).iterator());
 
         lenient().when(recoveryDescriptorProvider.getRecoveryDescriptor(any(), any(), anyShort()))
                 .thenReturn(recoveryDescriptor);
@@ -182,6 +176,7 @@ class RecoveryClientHandshakeManagerTest extends BaseIgniteAbstractTest {
                 CLIENT_CONSISTENT_ID,
                 CONNECTION_INDEX,
                 recoveryDescriptorProvider,
+                () -> List.of(thisChannel.eventLoop()),
                 new AllIdsAreFresh(),
                 channelCreationListener,
                 stopping
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
index 98a5b9641d..0950eb3414 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
@@ -39,7 +39,6 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelProgressivePromise;
 import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.EventExecutor;
 import java.util.List;
 import java.util.UUID;
@@ -96,9 +95,6 @@ class RecoveryServerHandshakeManagerTest extends BaseIgniteAbstractTest {
     @Mock
     private EventLoop eventLoop;
 
-    @Mock
-    private EventLoopGroup eventLoopGroup;
-
     @Captor
     private ArgumentCaptor<OutNetworkObject> sentMessageCaptor;
 
@@ -120,8 +116,6 @@ class RecoveryServerHandshakeManagerTest extends BaseIgniteAbstractTest {
         lenient().when(eventExecutor.inEventLoop()).thenReturn(true);
 
         lenient().when(channel.eventLoop()).thenReturn(eventLoop);
-        lenient().when(eventLoop.parent()).thenReturn(eventLoopGroup);
-        lenient().when(eventLoopGroup.iterator()).thenReturn(List.of((EventExecutor) eventLoop).iterator());
 
         lenient().when(channel.writeAndFlush(any())).then(invocation -> {
             DefaultChannelProgressivePromise future = new DefaultChannelProgressivePromise(channel, eventExecutor);
@@ -173,6 +167,7 @@ class RecoveryServerHandshakeManagerTest extends BaseIgniteAbstractTest {
                 SERVER_CONSISTENT_ID,
                 MESSAGE_FACTORY,
                 recoveryDescriptorProvider,
+                () -> List.of(channel.eventLoop()),
                 new AllIdsAreFresh(),
                 channelCreationListener,
                 stopping
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index df0b826df4..4398c93cc1 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -474,7 +474,7 @@ class DefaultMessagingServiceTest extends BaseIgniteAbstractTest {
                 node.name(),
                 bootstrapFactory,
                 staleIdDetector,
-                clientHandshakeManagerFactoryAdding(beforeHandshake, staleIdDetector)
+                clientHandshakeManagerFactoryAdding(beforeHandshake, bootstrapFactory, staleIdDetector)
         );
         connectionManager.start();
 
@@ -484,19 +484,21 @@ class DefaultMessagingServiceTest extends BaseIgniteAbstractTest {
     }
 
     private static RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake,
-            StaleIdDetector staleIdDetector) {
+            NettyBootstrapFactory bootstrapFactory, StaleIdDetector staleIdDetector) {
         return new RecoveryClientHandshakeManagerFactory() {
             @Override
             public RecoveryClientHandshakeManager create(
                     UUID launchId,
                     String consistentId,
                     short connectionId,
-                    RecoveryDescriptorProvider recoveryDescriptorProvider) {
+                    RecoveryDescriptorProvider recoveryDescriptorProvider
+            ) {
                 return new RecoveryClientHandshakeManager(
                         launchId,
                         consistentId,
                         connectionId,
                         recoveryDescriptorProvider,
+                        bootstrapFactory,
                         staleIdDetector,
                         channel -> {},
                         () -> false