You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2024/02/01 08:41:07 UTC
(ignite-3) branch main updated: IGNITE-21414 Use the same event loop for channels of the same logical connection (#3135)
This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f96035f0fb IGNITE-21414 Use the same event loop for channels of the same logical connection (#3135)
f96035f0fb is described below
commit f96035f0fb65dd67c67a0029ff9a9ca4c18de439
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Feb 1 12:41:02 2024 +0400
IGNITE-21414 Use the same event loop for channels of the same logical connection (#3135)
---
.../network/netty/ChannelEventLoopsSource.java | 33 ++++
.../internal/network/netty/ConnectionManager.java | 5 +
.../network/recovery/HandshakeManagerUtils.java | 47 +++--
.../recovery/RecoveryClientHandshakeManager.java | 7 +-
.../recovery/RecoveryServerHandshakeManager.java | 7 +-
.../ignite/network/NettyBootstrapFactory.java | 39 +++-
.../ignite/network/NettyWorkersRegistrar.java | 8 +-
.../network/netty/RecoveryHandshakeTest.java | 211 ++++++++++++++-------
.../RecoveryClientHandshakeManagerTest.java | 7 +-
.../RecoveryServerHandshakeManagerTest.java | 7 +-
.../network/DefaultMessagingServiceTest.java | 8 +-
11 files changed, 267 insertions(+), 112 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
new file mode 100644
index 0000000000..a499eb5843
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.EventLoop;
+import java.util.List;
+
+/**
+ * Allows to obtain a list of all event loops with which {@link io.netty.channel.Channel}s might be registered.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ChannelEventLoopsSource {
+ /**
+ * Returns list of all event loops with which {@link io.netty.channel.Channel}s might be registered.
+ * This must always return the same event loops in the same order.
+ */
+ List<EventLoop> channelEventLoops();
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index e727f30eae..557c14e401 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -107,6 +107,8 @@ public class ConnectionManager implements ChannelCreationListener {
/** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
private final UUID launchId;
+ private final NettyBootstrapFactory bootstrapFactory;
+
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
@@ -182,6 +184,7 @@ public class ConnectionManager implements ChannelCreationListener {
this.serializationService = serializationService;
this.launchId = launchId;
this.consistentId = consistentId;
+ this.bootstrapFactory = bootstrapFactory;
this.staleIdDetector = staleIdDetector;
this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
this.networkConfiguration = networkConfiguration;
@@ -481,6 +484,7 @@ public class ConnectionManager implements ChannelCreationListener {
consistentId,
connectionId,
descriptorProvider,
+ bootstrapFactory,
staleIdDetector,
this,
stopping::get
@@ -501,6 +505,7 @@ public class ConnectionManager implements ChannelCreationListener {
consistentId,
FACTORY,
descriptorProvider,
+ bootstrapFactory,
staleIdDetector,
this,
stopping::get
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index 6e790dd419..7f54fbe2e1 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -22,13 +22,13 @@ import static java.util.Collections.emptyList;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.EventExecutor;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionRea
import org.apache.ignite.network.OutNetworkObject;
class HandshakeManagerUtils {
+ private static final IgniteLogger LOG = Loggers.forClass(HandshakeManagerUtils.class);
+
private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
static void sendRejectionMessageAndFailHandshake(
@@ -76,14 +78,36 @@ class HandshakeManagerUtils {
*
* @param channel Channel to move.
* @param channelKey Key of the logical connection.
+ * @param eventLoopsSource Used to get all event loops with which a channel might be registered.
* @param afterSwitching Action to execute after switching (it will be executed on the new event loop).
*/
- static void switchEventLoopIfNeeded(Channel channel, ChannelKey channelKey, Runnable afterSwitching) {
- EventLoop targetEventLoop = eventLoopForKey(channelKey, channel);
+ static void switchEventLoopIfNeeded(
+ Channel channel,
+ ChannelKey channelKey,
+ ChannelEventLoopsSource eventLoopsSource,
+ Runnable afterSwitching
+ ) {
+ EventLoop targetEventLoop = eventLoopForKey(channelKey, eventLoopsSource);
if (targetEventLoop != channel.eventLoop()) {
channel.deregister().addListener(deregistrationFuture -> {
+ if (!deregistrationFuture.isSuccess()) {
+ LOG.error("Cannot deregister a channel from an event loop", deregistrationFuture.cause());
+
+ channel.close();
+
+ return;
+ }
+
targetEventLoop.register(channel).addListener(registrationFuture -> {
+ if (!registrationFuture.isSuccess()) {
+ LOG.error("Cannot register a channel with an event loop", registrationFuture.cause());
+
+ channel.close();
+
+ return;
+ }
+
afterSwitching.run();
});
});
@@ -92,17 +116,8 @@ class HandshakeManagerUtils {
}
}
- private static EventLoop eventLoopForKey(ChannelKey channelKey, Channel channel) {
- EventLoopGroup group = channel.eventLoop().parent();
- if (group == null) {
- // EmbeddedEventLoop#parent() returns null, handle this.
- return channel.eventLoop();
- }
-
- List<EventLoop> eventLoops = new ArrayList<>();
- for (EventExecutor childExecutor : group) {
- eventLoops.add((EventLoop) childExecutor);
- }
+ private static EventLoop eventLoopForKey(ChannelKey channelKey, ChannelEventLoopsSource eventLoopsSource) {
+ List<EventLoop> eventLoops = eventLoopsSource.channelEventLoops();
int index = (channelKey.hashCode() & Integer.MAX_VALUE) % eventLoops.size();
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 6247f94100..3c79607534 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsExceptio
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
+import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
@@ -72,6 +73,8 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
/** Recovery descriptor provider. */
private final RecoveryDescriptorProvider recoveryDescriptorProvider;
+ private final ChannelEventLoopsSource channelEventLoopsSource;
+
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
@@ -122,6 +125,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
String consistentId,
short connectionId,
RecoveryDescriptorProvider recoveryDescriptorProvider,
+ ChannelEventLoopsSource channelEventLoopsSource,
StaleIdDetector staleIdDetector,
ChannelCreationListener channelCreationListener,
BooleanSupplier stopping
@@ -130,6 +134,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
this.consistentId = consistentId;
this.connectionId = connectionId;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
+ this.channelEventLoopsSource = channelEventLoopsSource;
this.staleIdDetector = staleIdDetector;
this.stopping = stopping;
@@ -230,7 +235,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
this.remoteConsistentId = handshakeStartMessage.consistentId();
ChannelKey channelKey = new ChannelKey(remoteConsistentId, remoteLaunchId, connectionId);
- switchEventLoopIfNeeded(channel, channelKey, () -> proceedAfterSavingIds(handshakeStartMessage));
+ switchEventLoopIfNeeded(channel, channelKey, channelEventLoopsSource, () -> proceedAfterSavingIds(handshakeStartMessage));
}
private void proceedAfterSavingIds(HandshakeStartMessage handshakeStartMessage) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index e38bee24f9..fb227db5be 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
+import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
@@ -96,6 +97,8 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
/** Recovery descriptor provider. */
private final RecoveryDescriptorProvider recoveryDescriptorProvider;
+ private final ChannelEventLoopsSource channelEventLoopsSource;
+
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
@@ -120,6 +123,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
String consistentId,
NetworkMessagesFactory messageFactory,
RecoveryDescriptorProvider recoveryDescriptorProvider,
+ ChannelEventLoopsSource channelEventLoopsSource,
StaleIdDetector staleIdDetector,
ChannelCreationListener channelCreationListener,
BooleanSupplier stopping
@@ -128,6 +132,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
this.consistentId = consistentId;
this.messageFactory = messageFactory;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
+ this.channelEventLoopsSource = channelEventLoopsSource;
this.staleIdDetector = staleIdDetector;
this.stopping = stopping;
@@ -218,7 +223,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
this.remoteChannelId = message.connectionId();
ChannelKey channelKey = new ChannelKey(remoteConsistentId, remoteLaunchId, remoteChannelId);
- switchEventLoopIfNeeded(channel, channelKey, () -> tryAcquireDescriptorAndFinishHandshake(message));
+ switchEventLoopIfNeeded(channel, channelKey, channelEventLoopsSource, () -> tryAcquireDescriptorAndFinishHandshake(message));
}
private boolean possiblyRejectHandshakeStartResponse(HandshakeStartResponseMessage message) {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 7c8d168629..c270bfe251 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -23,10 +23,12 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.EventExecutor;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -34,6 +36,7 @@ import org.apache.ignite.internal.network.configuration.InboundView;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.configuration.OutboundView;
+import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup;
import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup.NetworkThread;
import org.jetbrains.annotations.TestOnly;
@@ -41,7 +44,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Netty bootstrap factory. Holds shared {@link EventLoopGroup} instances and encapsulates common Netty {@link Bootstrap} creation logic.
*/
-public class NettyBootstrapFactory implements IgniteComponent {
+public class NettyBootstrapFactory implements IgniteComponent, ChannelEventLoopsSource {
/** Network configuration. */
private final NetworkConfiguration networkConfiguration;
@@ -49,13 +52,16 @@ public class NettyBootstrapFactory implements IgniteComponent {
private final String eventLoopGroupNamePrefix;
/** Server boss socket channel handler event loop group. */
- private NioEventLoopGroup bossGroup;
+ private EventLoopGroup bossGroup;
/** Server work socket channel handler event loop group. */
- private NioEventLoopGroup workerGroup;
+ private EventLoopGroup workerGroup;
/** Client socket channel handler event loop group. */
- private NioEventLoopGroup clientWorkerGroup;
+ private EventLoopGroup clientWorkerGroup;
+
+ /** All event loops with which {@link io.netty.channel.Channel}s might be registered. */
+ private volatile List<EventLoop> channelEventLoops;
/**
* Constructor.
@@ -138,7 +144,7 @@ public class NettyBootstrapFactory implements IgniteComponent {
/**
* Returns all event loop groups managed by this factory.
*/
- List<NioEventLoopGroup> eventLoopGroups() {
+ List<EventLoopGroup> eventLoopGroups() {
return List.of(bossGroup, workerGroup, clientWorkerGroup);
}
@@ -149,9 +155,23 @@ public class NettyBootstrapFactory implements IgniteComponent {
workerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-srv-worker");
clientWorkerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-client");
+ this.channelEventLoops = List.copyOf(eventLoopsAt(workerGroup, clientWorkerGroup));
+
return nullCompletedFuture();
}
+ private static List<EventLoop> eventLoopsAt(EventLoopGroup ... groups) {
+ List<EventLoop> channelEventLoops = new ArrayList<>();
+
+ for (EventLoopGroup group : groups) {
+ for (EventExecutor child : group) {
+ channelEventLoops.add((EventLoop) child);
+ }
+ }
+
+ return channelEventLoops;
+ }
+
/**
* Returns {@code true} if the current thread is a network thread, {@code false} otherwise.
*
@@ -176,11 +196,16 @@ public class NettyBootstrapFactory implements IgniteComponent {
}
+ @Override
+ public List<EventLoop> channelEventLoops() {
+ return channelEventLoops;
+ }
+
/**
* Returns worker event loop group.
*/
@TestOnly
- public NioEventLoopGroup serverEventLoopGroup() {
+ public EventLoopGroup serverEventLoopGroup() {
return workerGroup;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyWorkersRegistrar.java b/modules/network/src/main/java/org/apache/ignite/network/NettyWorkersRegistrar.java
index cf60d08d39..a7e0a82089 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyWorkersRegistrar.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyWorkersRegistrar.java
@@ -20,7 +20,7 @@ package org.apache.ignite.network;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.util.ArrayList;
@@ -83,7 +83,7 @@ public class NettyWorkersRegistrar implements IgniteComponent {
@Override
public CompletableFuture<Void> start() {
List<NettyWorker> nettyWorkers = new ArrayList<>();
- for (NioEventLoopGroup group : bootstrapFactory.eventLoopGroups()) {
+ for (EventLoopGroup group : bootstrapFactory.eventLoopGroups()) {
registerWorkersFor(group, nettyWorkers);
}
workers = List.copyOf(nettyWorkers);
@@ -94,8 +94,8 @@ public class NettyWorkersRegistrar implements IgniteComponent {
return nullCompletedFuture();
}
- private void registerWorkersFor(NioEventLoopGroup group, List<NettyWorker> nettyWorkers) {
- List<NettyWorker> groupWorkers = new ArrayList<>(group.executorCount());
+ private void registerWorkersFor(EventLoopGroup group, List<NettyWorker> nettyWorkers) {
+ List<NettyWorker> groupWorkers = new ArrayList<>();
for (EventExecutor eventExecutor : group) {
SingleThreadEventExecutor executor = (SingleThreadEventExecutor) eventExecutor;
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index a5eb3c2dad..4d1376345f 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -26,8 +26,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -81,12 +83,14 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
- RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
- RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+ EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+ EmbeddedChannel serverSideChannel = createUnregisteredChannel();
- EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+ RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientSideChannel, clientRecovery);
+ RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverSideChannel, serverRecovery);
- EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+ setupChannel(clientSideChannel, clientHandshakeManager, noMessageListener);
+ setupChannel(serverSideChannel, serverHandshakeManager, noMessageListener);
assertTrue(serverSideChannel.isActive());
@@ -112,16 +116,23 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+ EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+ EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
UUID clientLaunchId = UUID.randomUUID();
RecoveryDescriptor serverRecoveryDescriptor = serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID);
addUnacknowledgedMessages(serverRecoveryDescriptor);
- RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager("client", clientLaunchId,
- clientRecovery);
- RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+ RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(
+ clientSideChannel,
+ "client",
+ clientLaunchId,
+ clientRecovery
+ );
+ RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverSideChannel, serverRecovery);
var messageCaptor = new AtomicReference<TestMessage>();
- EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, (inObject) -> {
+ setupChannel(clientSideChannel, clientHandshakeManager, (inObject) -> {
NetworkMessage msg = inObject.message();
assertInstanceOf(TestMessage.class, msg);
@@ -129,7 +140,7 @@ public class RecoveryHandshakeTest {
messageCaptor.set((TestMessage) msg);
});
- EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+ setupChannel(serverSideChannel, serverHandshakeManager, noMessageListener);
assertTrue(serverSideChannel.isActive());
@@ -165,18 +176,25 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+ EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+ EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
UUID serverLaunchId = UUID.randomUUID();
RecoveryDescriptor clientRecoveryDescriptor = clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID);
addUnacknowledgedMessages(clientRecoveryDescriptor);
- RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
- RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager("server", serverLaunchId,
- serverRecovery);
+ RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientSideChannel, clientRecovery);
+ RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(
+ serverSideChannel,
+ "server",
+ serverLaunchId,
+ serverRecovery
+ );
var messageCaptor = new AtomicReference<TestMessage>();
- EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+ setupChannel(clientSideChannel, clientHandshakeManager, noMessageListener);
- EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, (inObject) -> {
+ setupChannel(serverSideChannel, serverHandshakeManager, (inObject) -> {
NetworkMessage msg = inObject.message();
assertInstanceOf(TestMessage.class, msg);
@@ -218,25 +236,30 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider();
RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider();
+ EmbeddedChannel channel1Src = createUnregisteredChannel();
+ EmbeddedChannel channel1Dst = createUnregisteredChannel();
+ EmbeddedChannel channel2Src = createUnregisteredChannel();
+ EmbeddedChannel channel2Dst = createUnregisteredChannel();
+
UUID node1Uuid = LOWER_UUID;
UUID node2Uuid = HIGHER_UUID;
- RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
- RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+ RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager(channel1Src, "client", node1Uuid, node1Recovery);
+ RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager(channel2Dst, "client", node1Uuid, node1Recovery);
- RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
- RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+ RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager(channel2Src, "server", node2Uuid, node2Recovery);
+ RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager(channel1Dst, "server", node2Uuid, node2Recovery);
// Channel opened from node1 to node2 is channel 1.
// Channel opened from node2 to node1 is channel 2.
// Channel 1.
- EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
- EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
+ setupChannel(channel1Src, chm1, noMessageListener);
+ setupChannel(channel1Dst, shm2, noMessageListener);
// Channel 2.
- EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
- EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
+ setupChannel(channel2Src, chm2, noMessageListener);
+ setupChannel(channel2Dst, shm1, noMessageListener);
exchangeServerToClient(channel2Dst, channel2Src);
exchangeServerToClient(channel1Dst, channel1Src);
@@ -273,25 +296,30 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider();
RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider();
+ EmbeddedChannel channel1Src = createUnregisteredChannel();
+ EmbeddedChannel channel1Dst = createUnregisteredChannel();
+ EmbeddedChannel channel2Src = createUnregisteredChannel();
+ EmbeddedChannel channel2Dst = createUnregisteredChannel();
+
UUID node1Uuid = node1LaunchIdIsLower ? LOWER_UUID : HIGHER_UUID;
UUID node2Uuid = node1LaunchIdIsLower ? HIGHER_UUID : LOWER_UUID;
- RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
- RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+ RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager(channel1Src, "client", node1Uuid, node1Recovery);
+ RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager(channel2Dst, "client", node1Uuid, node1Recovery);
- RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
- RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+ RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager(channel2Src, "server", node2Uuid, node2Recovery);
+ RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager(channel1Dst, "server", node2Uuid, node2Recovery);
// Channel opened from node1 to node2 is channel 1.
// Channel opened from node2 to node1 is channel 2.
// Channel 1.
- EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
- EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
+ setupChannel(channel1Src, chm1, noMessageListener);
+ setupChannel(channel1Dst, shm2, noMessageListener);
// Channel 2.
- EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
- EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
+ setupChannel(channel2Src, chm2, noMessageListener);
+ setupChannel(channel2Dst, shm1, noMessageListener);
// Channel 2's handshake acquires both locks.
exchangeServerToClient(channel2Dst, channel2Src);
@@ -339,17 +367,28 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
- RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId,
- clientRecovery);
- RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId,
- serverRecovery);
+ EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+ EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
+ RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(
+ clientSideChannel,
+ client,
+ clientLaunchId,
+ clientRecovery
+ );
+ RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(
+ serverSideChannel,
+ server,
+ serverLaunchId,
+ serverRecovery
+ );
var receivedFirst = new AtomicBoolean();
var listener1 = new MessageListener("1", receivedFirst);
- EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener1 : noMessageListener);
- EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener1);
+ setupChannel(clientSideChannel, clientHandshakeManager, serverDidntReceiveAck ? listener1 : noMessageListener);
+ setupChannel(serverSideChannel, serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener1);
// Normal handshake
exchangeServerToClient(serverSideChannel, clientSideChannel);
@@ -380,8 +419,8 @@ public class RecoveryHandshakeTest {
}
// Simulate reconnection
- clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
- serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+ clientHandshakeManager = createRecoveryClientHandshakeManager(clientSideChannel, client, clientLaunchId, clientRecovery);
+ serverHandshakeManager = createRecoveryServerHandshakeManager(serverSideChannel, server, serverLaunchId, serverRecovery);
var receivedSecond = new AtomicBoolean();
@@ -390,8 +429,11 @@ public class RecoveryHandshakeTest {
clientSideChannel.finishAndReleaseAll();
serverSideChannel.finishAndReleaseAll();
- clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
- serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
+ clientSideChannel = createUnregisteredChannel();
+ serverSideChannel = createUnregisteredChannel();
+
+ setupChannel(clientSideChannel, clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
+ setupChannel(serverSideChannel, serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
// Handshake
exchangeServerToClient(serverSideChannel, clientSideChannel);
@@ -426,17 +468,20 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
- RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+ EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+ EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
+ RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientSideChannel, clientRecovery);
RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(
+ serverSideChannel,
"server",
UUID.randomUUID(),
serverRecovery,
new AllIdsAreStale()
);
- EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
-
- EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+ setupChannel(clientSideChannel, clientHandshakeManager, noMessageListener);
+ setupChannel(serverSideChannel, serverHandshakeManager, noMessageListener);
assertTrue(serverSideChannel.isActive());
@@ -462,17 +507,20 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+ EmbeddedChannel clientSideChannel = createUnregisteredChannel();
+ EmbeddedChannel serverSideChannel = createUnregisteredChannel();
+
RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(
+ clientSideChannel,
"client",
UUID.randomUUID(),
clientRecovery,
new AllIdsAreStale()
);
- RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
-
- EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+ RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverSideChannel, serverRecovery);
- EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+ setupChannel(clientSideChannel, clientHandshakeManager, noMessageListener);
+ setupChannel(serverSideChannel, serverHandshakeManager, noMessageListener);
assertTrue(serverSideChannel.isActive());
@@ -581,19 +629,20 @@ public class RecoveryHandshakeTest {
private final Consumer<InNetworkObject> noMessageListener = inNetworkObject ->
fail("Received message while shouldn't have, [" + inNetworkObject.message() + "]");
- private EmbeddedChannel setupChannel(HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) throws Exception {
- // Channel should not be registered at first, not before we add pipeline handlers
- // Otherwise, events like "channel active" won't be propagated to the handlers
- var channel = new EmbeddedChannel(false, false);
-
+ private void setupChannel(EmbeddedChannel channel, HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener)
+ throws Exception {
var serializationService = new SerializationService(MESSAGE_REGISTRY, createUserObjectSerializationContext());
var sessionSerializationService = new PerSessionSerializationService(serializationService);
PipelineUtils.setup(channel.pipeline(), sessionSerializationService, handshakeManager, messageListener);
channel.register();
+ }
- return channel;
+ private static EmbeddedChannel createUnregisteredChannel() {
+ // Channel should not be registered at first, not before we add pipeline handlers
+ // Otherwise, events like "channel active" won't be propagated to the handlers
+ return new EmbeddedChannel(false, false);
}
private UserObjectSerializationContext createUserObjectSerializationContext() {
@@ -606,44 +655,70 @@ public class RecoveryHandshakeTest {
userObjectMarshaller);
}
- private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(RecoveryDescriptorProvider provider) {
- return createRecoveryClientHandshakeManager("client", UUID.randomUUID(), provider);
+ private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(
+ Channel clientSideChannel,
+ RecoveryDescriptorProvider provider
+ ) {
+ return createRecoveryClientHandshakeManager(clientSideChannel, "client", UUID.randomUUID(), provider);
}
- private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
- RecoveryDescriptorProvider provider) {
- return createRecoveryClientHandshakeManager(consistentId, launchId, provider, new AllIdsAreFresh());
+ private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(
+ Channel clientSideChannel,
+ String consistentId,
+ UUID launchId,
+ RecoveryDescriptorProvider provider
+ ) {
+ return createRecoveryClientHandshakeManager(clientSideChannel, consistentId, launchId, provider, new AllIdsAreFresh());
}
- private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
- RecoveryDescriptorProvider provider, StaleIdDetector staleIdDetector) {
+ private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(
+ Channel clientSideChannel,
+ String consistentId,
+ UUID launchId,
+ RecoveryDescriptorProvider provider,
+ StaleIdDetector staleIdDetector
+ ) {
return new RecoveryClientHandshakeManager(
launchId,
consistentId,
CONNECTION_ID,
provider,
+ () -> List.of(clientSideChannel.eventLoop()),
staleIdDetector,
channel -> {},
() -> false
);
}
- private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
- return createRecoveryServerHandshakeManager("server", UUID.randomUUID(), provider);
+ private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(
+ Channel serverSideChannel,
+ RecoveryDescriptorProvider provider
+ ) {
+ return createRecoveryServerHandshakeManager(serverSideChannel, "server", UUID.randomUUID(), provider);
}
- private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
- RecoveryDescriptorProvider provider) {
- return createRecoveryServerHandshakeManager(consistentId, launchId, provider, new AllIdsAreFresh());
+ private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(
+ Channel serverSideChannel,
+ String consistentId,
+ UUID launchId,
+ RecoveryDescriptorProvider provider
+ ) {
+ return createRecoveryServerHandshakeManager(serverSideChannel, consistentId, launchId, provider, new AllIdsAreFresh());
}
- private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
- RecoveryDescriptorProvider provider, StaleIdDetector staleIdDetector) {
+ private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(
+ Channel serverSideChannel,
+ String consistentId,
+ UUID launchId,
+ RecoveryDescriptorProvider provider,
+ StaleIdDetector staleIdDetector
+ ) {
return new RecoveryServerHandshakeManager(
launchId,
consistentId,
MESSAGE_FACTORY,
provider,
+ () -> List.of(serverSideChannel.eventLoop()),
staleIdDetector,
channel -> {},
() -> false
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
index 0edb72fbc7..c7afffd61d 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
@@ -40,7 +40,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelProgressivePromise;
import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import java.util.List;
import java.util.UUID;
@@ -105,9 +104,6 @@ class RecoveryClientHandshakeManagerTest extends BaseIgniteAbstractTest {
@Mock
private EventLoop eventLoop;
- @Mock
- private EventLoopGroup eventLoopGroup;
-
@Mock
private NettySender competitorNettySender;
@@ -127,8 +123,6 @@ class RecoveryClientHandshakeManagerTest extends BaseIgniteAbstractTest {
lenient().when(eventExecutor.inEventLoop()).thenReturn(true);
lenient().when(thisChannel.eventLoop()).thenReturn(eventLoop);
- lenient().when(eventLoop.parent()).thenReturn(eventLoopGroup);
- lenient().when(eventLoopGroup.iterator()).thenReturn(List.of((EventExecutor) eventLoop).iterator());
lenient().when(recoveryDescriptorProvider.getRecoveryDescriptor(any(), any(), anyShort()))
.thenReturn(recoveryDescriptor);
@@ -182,6 +176,7 @@ class RecoveryClientHandshakeManagerTest extends BaseIgniteAbstractTest {
CLIENT_CONSISTENT_ID,
CONNECTION_INDEX,
recoveryDescriptorProvider,
+ () -> List.of(thisChannel.eventLoop()),
new AllIdsAreFresh(),
channelCreationListener,
stopping
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
index 98a5b9641d..0950eb3414 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
@@ -39,7 +39,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelProgressivePromise;
import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import java.util.List;
import java.util.UUID;
@@ -96,9 +95,6 @@ class RecoveryServerHandshakeManagerTest extends BaseIgniteAbstractTest {
@Mock
private EventLoop eventLoop;
- @Mock
- private EventLoopGroup eventLoopGroup;
-
@Captor
private ArgumentCaptor<OutNetworkObject> sentMessageCaptor;
@@ -120,8 +116,6 @@ class RecoveryServerHandshakeManagerTest extends BaseIgniteAbstractTest {
lenient().when(eventExecutor.inEventLoop()).thenReturn(true);
lenient().when(channel.eventLoop()).thenReturn(eventLoop);
- lenient().when(eventLoop.parent()).thenReturn(eventLoopGroup);
- lenient().when(eventLoopGroup.iterator()).thenReturn(List.of((EventExecutor) eventLoop).iterator());
lenient().when(channel.writeAndFlush(any())).then(invocation -> {
DefaultChannelProgressivePromise future = new DefaultChannelProgressivePromise(channel, eventExecutor);
@@ -173,6 +167,7 @@ class RecoveryServerHandshakeManagerTest extends BaseIgniteAbstractTest {
SERVER_CONSISTENT_ID,
MESSAGE_FACTORY,
recoveryDescriptorProvider,
+ () -> List.of(channel.eventLoop()),
new AllIdsAreFresh(),
channelCreationListener,
stopping
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index df0b826df4..4398c93cc1 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -474,7 +474,7 @@ class DefaultMessagingServiceTest extends BaseIgniteAbstractTest {
node.name(),
bootstrapFactory,
staleIdDetector,
- clientHandshakeManagerFactoryAdding(beforeHandshake, staleIdDetector)
+ clientHandshakeManagerFactoryAdding(beforeHandshake, bootstrapFactory, staleIdDetector)
);
connectionManager.start();
@@ -484,19 +484,21 @@ class DefaultMessagingServiceTest extends BaseIgniteAbstractTest {
}
private static RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake,
- StaleIdDetector staleIdDetector) {
+ NettyBootstrapFactory bootstrapFactory, StaleIdDetector staleIdDetector) {
return new RecoveryClientHandshakeManagerFactory() {
@Override
public RecoveryClientHandshakeManager create(
UUID launchId,
String consistentId,
short connectionId,
- RecoveryDescriptorProvider recoveryDescriptorProvider) {
+ RecoveryDescriptorProvider recoveryDescriptorProvider
+ ) {
return new RecoveryClientHandshakeManager(
launchId,
consistentId,
connectionId,
recoveryDescriptorProvider,
+ bootstrapFactory,
staleIdDetector,
channel -> {},
() -> false