You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/05/13 15:41:25 UTC
[ignite-3] branch main updated: IGNITE-14085 Implement network recovery protocol
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8c075cdd8 IGNITE-14085 Implement network recovery protocol
8c075cdd8 is described below
commit 8c075cdd8defd5ff811c1fa68864650d22c7d06e
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Sun May 1 01:59:42 2022 +0400
IGNITE-14085 Implement network recovery protocol
---
.../internal/network/processor/MessageClass.java | 1 +
.../org/apache/ignite/network/NetworkMessage.java | 11 +-
.../network/netty/ItConnectionManagerTest.java | 7 +-
.../network/recovery/ItRecoveryHandshakeTest.java | 478 ---------------------
.../internal/network/NetworkMessageTypes.java | 18 +-
.../network/handshake/HandshakeManager.java | 25 +-
.../internal/network/netty/ConnectionManager.java | 74 ++--
.../netty/DefaultRecoveryDescriptorProvider.java | 109 +++++
.../internal/network/netty/HandshakeHandler.java | 61 +--
.../internal/network/netty/InboundDecoder.java | 3 +
.../network/netty/InboundRecoveryHandler.java | 72 ++++
.../netty/IoExceptionSuppressingHandler.java | 3 +
.../internal/network/netty/MessageHandler.java | 8 +
.../ignite/internal/network/netty/NettyClient.java | 11 +-
.../ignite/internal/network/netty/NettyServer.java | 21 +-
.../internal/network/netty/OutboundEncoder.java | 3 +
.../network/netty/OutboundRecoveryHandler.java | 54 +++
.../internal/network/netty/PipelineUtils.java | 68 +++
.../recovery/RecoveryClientHandshakeManager.java | 167 +++++--
.../network/recovery/RecoveryDescriptor.java | 127 ++++++
...essage.java => RecoveryDescriptorProvider.java} | 27 +-
.../recovery/RecoveryServerHandshakeManager.java | 148 ++++++-
...artMessage.java => AcknowledgementMessage.java} | 24 +-
...artMessage.java => HandshakeFinishMessage.java} | 25 +-
.../recovery/message/HandshakeStartMessage.java | 6 +-
.../message/HandshakeStartResponseMessage.java | 12 +-
...shakeStartMessage.java => InternalMessage.java} | 26 +-
.../apache/ignite/network/OutNetworkObject.java | 27 ++
.../scalecube/ScaleCubeClusterServiceFactory.java | 5 +-
.../internal/network/netty/NettyClientTest.java | 16 +-
.../internal/network/netty/NettyServerTest.java | 11 +-
.../network/netty/RecoveryHandshakeTest.java | 440 +++++++++++++++++++
32 files changed, 1315 insertions(+), 773 deletions(-)
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/MessageClass.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/MessageClass.java
index d00712727..caeab8d6d 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/MessageClass.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/MessageClass.java
@@ -81,6 +81,7 @@ public class MessageClass {
.filter(e -> !typeUtils.isSameType(e.asType(), NetworkMessage.class))
.flatMap(e -> e.getEnclosedElements().stream())
.filter(e -> e.getKind() == ElementKind.METHOD)
+ .filter(e -> !((ExecutableElement) e).isDefault())
// use a tree map to sort getters by name and remove duplicates
.collect(Collectors.toMap(
e -> e.getSimpleName().toString(),
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
index c31acf7ca..e0d00cf1e 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
@@ -24,7 +24,7 @@ import it.unimi.dsi.fastutil.ints.IntSet;
*/
public interface NetworkMessage {
/** Size of the message type (in bytes), used during (de-)serialization. */
- static final int MSG_TYPE_SIZE_BYTES = 4;
+ int MSG_TYPE_SIZE_BYTES = 4;
/**
* Message type. Must be <b>distinct</b> among all messages in a <i>message group</i>. Only positive values are allowed.
@@ -51,4 +51,13 @@ public interface NetworkMessage {
default void unmarshal(Object marshaller, Object descriptors) throws Exception {
// No-op.
}
+
+ /**
+ * Returns {@code true} if this message needs an acknowledgement from the remote node, {@code false} otherwise.
+ *
+ * @return {@code true} if this message needs an acknowledgement from the remote node, {@code false} otherwise.
+ */
+ default boolean needAck() {
+ return true;
+ }
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 9f9b95786..fe4315490 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -46,8 +46,6 @@ import org.apache.ignite.configuration.schemas.network.NetworkView;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
-import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -147,7 +145,7 @@ public class ItConnectionManagerTest {
NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
// Ensure a handshake has finished on both sides by sending a message.
- // TODO: IGNITE-14085 When the recovery protocol is implemented replace this with simple
+ // TODO: IGNITE-16947 When the recovery protocol is implemented replace this with simple
// CompletableFuture#get called on the send future.
var messageReceivedOn2 = new CompletableFuture<Void>();
@@ -353,9 +351,8 @@ public class ItConnectionManagerTest {
var manager = new ConnectionManager(
cfg,
new SerializationService(registry, mock(UserObjectSerializationContext.class)),
+ launchId,
consistentId,
- () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
- () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
bootstrapFactory
);
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
deleted file mode 100644
index f081d195d..000000000
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.recovery;
-
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import io.netty.channel.Channel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
-import org.apache.ignite.configuration.schemas.network.NetworkView;
-import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
-import org.apache.ignite.internal.network.netty.ConnectionManager;
-import org.apache.ignite.internal.network.netty.NettySender;
-import org.apache.ignite.internal.network.serialization.SerializationService;
-import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
-import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.OutNetworkObject;
-import org.apache.ignite.network.TestMessage;
-import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
-import org.apache.ignite.network.TestMessagesFactory;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-
-/**
- * Recovery protocol handshake tests.
- */
-@ExtendWith(ConfigurationExtension.class)
-public class ItRecoveryHandshakeTest {
- /** Started connection managers. */
- private final List<ConnectionManager> startedManagers = new ArrayList<>();
-
- /** Started bootstrap factories. */
- private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
-
- private final TestMessagesFactory messageFactory = new TestMessagesFactory();
-
- /** Reusable network configuration object. */
- @InjectConfiguration
- private NetworkConfiguration networkConfiguration;
-
- /**
- * After each.
- */
- @AfterEach
- final void tearDown() throws Exception {
- startedManagers.forEach(ConnectionManager::stop);
-
- for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
- startedBootstrapFactory.stop();
- }
- }
-
- /**
- * Tests handshake scenarios in which some of the parts of handshake protocol can fail.
- *
- * @param scenario Handshake scenario.
- * @throws Exception If failed.
- */
- @ParameterizedTest
- @MethodSource("handshakeScenarios")
- public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
- ConnectionManager manager1 = startManager(
- 4000,
- scenario.serverFailAt,
- CLIENT_DOESNT_FAIL
- );
-
- ConnectionManager manager2 = startManager(
- 4001,
- SERVER_DOESNT_FAIL,
- scenario.clientFailAt
- );
-
- NettySender from2to1;
-
- try {
- from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
- } catch (Exception e) {
- if (scenario.clientFailAt == CLIENT_DOESNT_FAIL && scenario.serverFailAt == SERVER_DOESNT_FAIL) {
- Assertions.fail(e);
- }
-
- return;
- }
-
- if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL) {
- Assertions.fail("Handshake should've failed");
- }
-
- assertNotNull(from2to1);
-
- // Ensure the handshake has finished on both sides.
- TestMessage msg = messageFactory.testMessage().msg("test").build();
- from2to1.send(new OutNetworkObject(msg, Collections.emptyList())).get(3, TimeUnit.SECONDS);
-
- NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
-
- assertNotNull(from1to2);
-
- assertEquals(from2to1.channel().localAddress(), from1to2.channel().remoteAddress());
- }
-
- /**
- * Tests special handshake scenario: the client assumes a handshake has been finished, but the server fails on client's response. The
- * server will then close a connection and the client should get the "connection closed event".
- *
- * @throws Exception If failed.
- */
- @Test
- public void testHandshakeFailsOnServerWhenClientResponded() throws Exception {
- ConnectionManager manager1 = startManager(
- 4000,
- SERVER_CLIENT_RESPONDED,
- CLIENT_DOESNT_FAIL
- );
-
- ConnectionManager manager2 = startManager(
- 4001,
- SERVER_DOESNT_FAIL,
- CLIENT_DOESNT_FAIL
- );
-
- NettySender from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
-
- from2to1.channel().closeFuture().get(3, TimeUnit.SECONDS);
- }
-
- /**
- * Tests that one server can handle multiple incoming connections and perform the handshake operation successfully.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testHandshakeWithMultipleClients() throws Exception {
- ConnectionManager server = startManager(4000);
-
- List<ConnectionManager> clients = new ArrayList<>();
-
- int clientCount = 10;
-
- for (int i = 0; i < clientCount; i++) {
- clients.add(startManager(4001 + i));
- }
-
- // A key is the client's consistent id, a value is the channel between the client and the server
- var channelsToServer = new HashMap<String, NettySender>();
-
- for (ConnectionManager client : clients) {
- channelsToServer.put(
- client.consistentId(),
- client.channel(server.consistentId(), server.getLocalAddress()).get(3, TimeUnit.SECONDS)
- );
- }
-
- assertTrue(waitForCondition(() -> server.channels().size() == clientCount, TimeUnit.SECONDS.toMillis(3)));
-
- Map<String, NettySender> channels = server.channels();
-
- // Assert that server's channels are inbound connections opened from clients
- channelsToServer.forEach((consistentId, toServer) -> {
- NettySender toClient = channels.get(consistentId);
-
- assertNotNull(toClient);
-
- assertEquals(toServer.channel().localAddress(), toClient.channel().remoteAddress());
- });
- }
-
- /**
- * Returns generates handshake scenarios.
- *
- * @return Generates handshake scenarios.
- */
- private static List<HandshakeScenario> handshakeScenarios() {
- ServerStageFail[] serverOpts = ServerStageFail.values();
-
- ClientStageFail[] clientOpts = ClientStageFail.values();
-
- List<HandshakeScenario> res = new ArrayList<>();
-
- for (ServerStageFail serverOpt : serverOpts) {
- for (ClientStageFail clientOpt : clientOpts) {
- // The case in if statement is handled in separate test
- if (serverOpt != SERVER_CLIENT_RESPONDED && clientOpt != CLIENT_DOESNT_FAIL) {
- res.add(new HandshakeScenario(serverOpt, clientOpt));
- }
- }
- }
-
- return res;
- }
-
- /** Handshake scenario. */
- private static class HandshakeScenario {
- /** Stage to fail server handshake at. */
- private final ServerStageFail serverFailAt;
-
- /** Stage to fail client handshake at. */
- private final ClientStageFail clientFailAt;
-
- /** Constructor. */
- private HandshakeScenario(ServerStageFail serverFailAt, ClientStageFail clientFailAt) {
- this.serverFailAt = serverFailAt;
- this.clientFailAt = clientFailAt;
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return String.format("server=%s, client=%s", serverFailAt, clientFailAt);
- }
- }
-
- /**
- * {@link RecoveryServerHandshakeManager} that can fail at specific stage of the handshake.
- */
- static class FailingRecoveryServerHandshakeManager extends RecoveryServerHandshakeManager {
- /**
- * At what stage to fail the handshake.
- */
- private final ServerStageFail failAtStage;
-
- /** Constructor. */
- private FailingRecoveryServerHandshakeManager(
- UUID launchId,
- String consistentId,
- ServerStageFail failAtStage,
- NetworkMessagesFactory messageFactory
- ) {
- super(launchId, consistentId, messageFactory);
- this.failAtStage = failAtStage;
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult init(Channel channel) {
- if (failAtStage == SERVER_INIT) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.init(channel);
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onConnectionOpen(Channel channel) {
- if (failAtStage == SERVER_CONNECTION_OPENED) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.onConnectionOpen(channel);
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
- if (failAtStage == SERVER_CLIENT_RESPONDED) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.onMessage(channel, message);
- }
-
- /** Server handshake stage to fail at. */
- enum ServerStageFail {
- /** Don't fail at all. */
- SERVER_DOESNT_FAIL,
-
- /** Fail on init. */
- SERVER_INIT,
-
- /** Fail on connection open. */
- SERVER_CONNECTION_OPENED,
-
- /** Fail on client response. */
- SERVER_CLIENT_RESPONDED
- }
- }
-
- /**
- * {@link RecoveryClientHandshakeManager} that can fail at specific stage of the handshake.
- */
- static class FailingRecoveryClientHandshakeManager extends RecoveryClientHandshakeManager {
- /**
- * At what stage to fail the handshake.
- */
- private final ClientStageFail failAtStage;
-
- /** Constructor. */
- private FailingRecoveryClientHandshakeManager(
- UUID launchId,
- String consistentId,
- ClientStageFail failAtStage,
- NetworkMessagesFactory messageFactory
- ) {
- super(launchId, consistentId, messageFactory);
- this.failAtStage = failAtStage;
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult init(Channel channel) {
- if (failAtStage == CLIENT_INIT) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.init(channel);
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onConnectionOpen(Channel channel) {
- if (failAtStage == CLIENT_CONNECTION_OPENED) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.onConnectionOpen(channel);
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
- if (failAtStage == CLIENT_SERVER_RESPONDED) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.onMessage(channel, message);
- }
-
- /** Client handshake stage to fail at. */
- enum ClientStageFail {
- /** Don't fail at all. */
- CLIENT_DOESNT_FAIL,
-
- /** Fail on init. */
- CLIENT_INIT,
-
- /** Fail on connection open. */
- CLIENT_CONNECTION_OPENED,
-
- /** Fail on server response. */
- CLIENT_SERVER_RESPONDED
- }
- }
-
- /**
- * Starts a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
- *
- * @param port Port for the server.
- * @param serverHandshakeFailAt At what stage to fail server handshake.
- * @param clientHandshakeFailAt At what stage to fail client handshake.
- * @return Connection manager.
- */
- private ConnectionManager startManager(
- int port,
- ServerStageFail serverHandshakeFailAt,
- ClientStageFail clientHandshakeFailAt
- ) {
- var registry = new TestMessageSerializationRegistryImpl();
- var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
-
- var messageFactory = new NetworkMessagesFactory();
-
- UUID launchId = UUID.randomUUID();
- String consistentId = UUID.randomUUID().toString();
-
- networkConfiguration.port().update(port).join();
-
- NetworkView cfg = networkConfiguration.value();
-
- NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
- bootstrapFactory.start();
- startedBootstrapFactories.add(bootstrapFactory);
-
- var manager = new ConnectionManager(
- cfg,
- serializationService,
- consistentId,
- () -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt, messageFactory),
- () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory),
- bootstrapFactory
- );
-
- manager.start();
-
- startedManagers.add(manager);
-
- return manager;
- }
-
- /**
- * Starts a {@link ConnectionManager} with a normal handshake manager adding it to the {@link #startedManagers} list.
- *
- * @param port Port.
- * @return Connection manager
- */
- private ConnectionManager startManager(int port) {
- var registry = new TestMessageSerializationRegistryImpl();
- var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
-
- var messageFactory = new NetworkMessagesFactory();
-
- UUID launchId = UUID.randomUUID();
- String consistentId = UUID.randomUUID().toString();
-
- networkConfiguration.port().update(port).join();
-
- NetworkView cfg = networkConfiguration.value();
-
- NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
- bootstrapFactory.start();
- startedBootstrapFactories.add(bootstrapFactory);
-
- var manager = new ConnectionManager(
- cfg,
- serializationService,
- consistentId,
- () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
- () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
- bootstrapFactory
- );
-
- manager.start();
-
- startedManagers.add(manager);
-
- return manager;
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index ee0df8b92..ac99b87bd 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
import org.apache.ignite.internal.network.message.InvokeRequest;
import org.apache.ignite.internal.network.message.InvokeResponse;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.network.annotations.MessageGroup;
@@ -57,18 +59,28 @@ public class NetworkMessageTypes {
*/
public static final short HANDSHAKE_START_RESPONSE = 4;
+ /**
+ * Type for {@link HandshakeFinishMessage}.
+ */
+ public static final short HANDSHAKE_FINISH = 5;
+
+ /**
+ * Type for {@link AcknowledgementMessage}.
+ */
+ public static final short ACKNOWLEDGEMENT = 6;
+
/**
* Type for {@link ClassDescriptorMessage}.
*/
- public static final short CLASS_DESCRIPTOR_MESSAGE = 5;
+ public static final short CLASS_DESCRIPTOR_MESSAGE = 7;
/**
* Type for {@link FieldDescriptorMessage}.
*/
- public static final short FIELD_DESCRIPTOR_MESSAGE = 6;
+ public static final short FIELD_DESCRIPTOR_MESSAGE = 8;
/**
* Type for {@link ClassDescriptorListMessage}.
*/
- public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 7;
+ public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 9;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
index c7d039788..e9de9e6d0 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
@@ -17,9 +17,8 @@
package org.apache.ignite.internal.network.handshake;
-import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.network.NetworkMessage;
@@ -28,29 +27,25 @@ import org.apache.ignite.network.NetworkMessage;
*/
public interface HandshakeManager {
/**
- * Initialize handshake manager with the channel.
+ * Initializes the handshake manager.
*
- * @param channel Channel.
- * @return Action to perform by {@link HandshakeHandler}.
+ * @param handlerContext Channel handler context.
*/
- HandshakeResult init(Channel channel);
+ void onInit(ChannelHandlerContext handlerContext);
/**
- * Handle an event of the connection opening.
- *
- * @param channel Channel.
- * @return Action to perform by {@link HandshakeHandler}.
+ * Handles an event of the connection opening.
*/
- HandshakeResult onConnectionOpen(Channel channel);
+ default void onConnectionOpen() {
+ // No-op.
+ }
/**
- * Handle an incoming message.
+ * Handles an incoming message.
*
- * @param channel Channel.
* @param message Message to handle.
- * @return Action to perform by {@link HandshakeHandler}.
*/
- HandshakeResult onMessage(Channel channel, NetworkMessage message);
+ void onMessage(NetworkMessage message);
/**
* Returns future that represents the handshake operation.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 7c1ae03ed..82f40ab40 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -18,25 +18,25 @@
package org.apache.ignite.internal.network.netty;
import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
-import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.ignite.configuration.schemas.network.NetworkView;
-import org.apache.ignite.configuration.schemas.network.OutboundView;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
@@ -48,6 +48,9 @@ import org.jetbrains.annotations.TestOnly;
* Class that manages connections both incoming and outgoing.
*/
public class ConnectionManager {
+ /** Message factory. */
+ private static final NetworkMessagesFactory FACTORY = new NetworkMessagesFactory();
+
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(ConnectionManager.class);
@@ -60,6 +63,7 @@ public class ConnectionManager {
/** Server. */
private final NettyServer server;
+ // TODO: IGNITE-16948 Should be a map consistentId -> connectionId -> sender
/** Channels map from consistentId to {@link NettySender}. */
private final Map<String, NettySender> channels = new ConcurrentHashMap<>();
@@ -75,8 +79,8 @@ public class ConnectionManager {
/** Node consistent id. */
private final String consistentId;
- /** Client handshake manager factory. */
- private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
+ /** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
+ private final UUID launchId;
/** Start flag. */
private final AtomicBoolean started = new AtomicBoolean(false);
@@ -84,31 +88,32 @@ public class ConnectionManager {
/** Stop flag. */
private final AtomicBoolean stopped = new AtomicBoolean(false);
+ /** Recovery descriptor provider. */
+ private final RecoveryDescriptorProvider descriptorProvider = new DefaultRecoveryDescriptorProvider();
+
/**
* Constructor.
*
* @param networkConfiguration Network configuration.
* @param serializationService Serialization service.
+ * @param launchId Launch id of this node.
* @param consistentId Consistent id of this node.
- * @param serverHandshakeManagerFactory Server handshake manager factory.
- * @param clientHandshakeManagerFactory Client handshake manager factory.
* @param bootstrapFactory Bootstrap factory.
*/
public ConnectionManager(
NetworkView networkConfiguration,
SerializationService serializationService,
+ UUID launchId,
String consistentId,
- Supplier<HandshakeManager> serverHandshakeManagerFactory,
- Supplier<HandshakeManager> clientHandshakeManagerFactory,
NettyBootstrapFactory bootstrapFactory
) {
this.serializationService = serializationService;
+ this.launchId = launchId;
this.consistentId = consistentId;
- this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
this.server = new NettyServer(
networkConfiguration,
- serverHandshakeManagerFactory,
+ this::createServerHandshakeManager,
this::onNewIncomingChannel,
this::onMessage,
serializationService,
@@ -137,7 +142,7 @@ public class ConnectionManager {
server.start().get();
- LOG.info("Connection created [address=" + server.address() + ']');
+ LOG.info("Server started [address=" + server.address() + ']');
} catch (ExecutionException e) {
Throwable cause = e.getCause();
throw new IgniteInternalException("Failed to start the connection manager: " + cause.getMessage(), cause);
@@ -180,10 +185,11 @@ public class ConnectionManager {
// Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
// when the client is ready for write operations, so previously started client, that didn't establish connection
- // or didn't perform the handhsake operaton, can be reused.
+ // or didn't perform the handshake operation, can be reused.
+ // TODO: IGNITE-16948 Connection id may be different from 0
NettyClient client = clients.compute(address, (addr, existingClient) ->
existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected()
- ? existingClient : connect(addr)
+ ? existingClient : connect(addr, (short) 0)
);
CompletableFuture<NettySender> sender = client.sender();
@@ -221,11 +227,11 @@ public class ConnectionManager {
* @param address Target address.
* @return New netty client.
*/
- private NettyClient connect(SocketAddress address) {
+ private NettyClient connect(SocketAddress address, short connectionId) {
var client = new NettyClient(
address,
serializationService,
- clientHandshakeManagerFactory.get(),
+ createClientHandshakeManager(connectionId),
this::onMessage
);
@@ -284,6 +290,14 @@ public class ConnectionManager {
return stopped.get();
}
+ private HandshakeManager createClientHandshakeManager(short connectionId) {
+ return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, FACTORY, descriptorProvider);
+ }
+
+ private HandshakeManager createServerHandshakeManager() {
+ return new RecoveryServerHandshakeManager(launchId, consistentId, FACTORY, descriptorProvider);
+ }
+
/**
* Returns connection manager's {@link #server}.
*
@@ -314,7 +328,6 @@ public class ConnectionManager {
return Collections.unmodifiableCollection(clients.values());
}
-
/**
* Returns map of the channels.
*
@@ -324,27 +337,4 @@ public class ConnectionManager {
public Map<String, NettySender> channels() {
return Collections.unmodifiableMap(channels);
}
-
- /**
- * Creates a {@link Bootstrap} for clients with channel options provided by a {@link OutboundView}.
- *
- * @param eventLoopGroup Event loop group for channel handling.
- * @param clientConfiguration Client configuration.
- * @return Bootstrap for clients.
- */
- public static Bootstrap createClientBootstrap(
- EventLoopGroup eventLoopGroup,
- OutboundView clientConfiguration
- ) {
- Bootstrap clientBootstrap = new Bootstrap();
-
- clientBootstrap.group(eventLoopGroup)
- .channel(NioSocketChannel.class)
- // See NettyServer#start for netty configuration details.
- .option(ChannelOption.SO_KEEPALIVE, clientConfiguration.soKeepAlive())
- .option(ChannelOption.SO_LINGER, clientConfiguration.soLinger())
- .option(ChannelOption.TCP_NODELAY, clientConfiguration.tcpNoDelay());
-
- return clientBootstrap;
- }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
new file mode 100644
index 000000000..3e900af6c
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements RecoveryDescriptorProvider {
+ // TODO: IGNITE-16954 Make this configurable
+ private static final int DEFAULT_QUEUE_LIMIT = 10;
+
+ /** Recovery descriptors. */
+ private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override
+ public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID launchId, short connectionIndex, boolean inbound) {
+ var key = new ChannelKey(consistentId, launchId, connectionIndex, inbound);
+
+ return recoveryDescriptors.computeIfAbsent(key, channelKey -> new RecoveryDescriptor(DEFAULT_QUEUE_LIMIT));
+ }
+
+ /** Channel key. */
+ private static class ChannelKey {
+ /** Remote node's consistent id. */
+ private final String consistentId;
+
+ /** Remote node's launch id. */
+ private final UUID launchId;
+
+ /**
+ * Connection id. Every connection between this node and a remote node has a unique connection id,
+ * but connections with different nodes may have the same ids.
+ */
+ private final short connectionId;
+
+ /** {@code true} if channel is inbound, {@code false} otherwise. */
+ private final boolean inbound;
+
+ private ChannelKey(String consistentId, UUID launchId, short connectionId, boolean inbound) {
+ this.consistentId = consistentId;
+ this.launchId = launchId;
+ this.connectionId = connectionId;
+ this.inbound = inbound;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ChannelKey that = (ChannelKey) o;
+
+ if (connectionId != that.connectionId) {
+ return false;
+ }
+ if (inbound != that.inbound) {
+ return false;
+ }
+ if (!consistentId.equals(that.consistentId)) {
+ return false;
+ }
+ return launchId.equals(that.launchId);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode() {
+ int result = consistentId.hashCode();
+ result = 31 * result + launchId.hashCode();
+ result = 31 * result + (int) connectionId;
+ result = 31 * result + (inbound ? 1 : 0);
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return S.toString(ChannelKey.class, this);
+ }
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
index 61088d049..67fb032ed 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
@@ -19,11 +19,10 @@ package org.apache.ignite.internal.network.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.Function;
-import org.apache.ignite.internal.network.handshake.HandshakeAction;
+import java.util.function.Consumer;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessage;
@@ -31,37 +30,43 @@ import org.apache.ignite.network.NetworkMessage;
* Netty handler of the handshake operation.
*/
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
+ /** Handler name. */
+ public static final String NAME = "handshake-handler";
+
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(HandshakeHandler.class);
/** Handshake manager. */
private final HandshakeManager manager;
- /** Function that creates a handler that substitutes this handshake handler after the handshake is done. */
- private final Function<String, ChannelInboundHandlerAdapter> newHandlerCreator;
+ private final Consumer<InNetworkObject> messageListener;
+
+ private final PerSessionSerializationService serializationService;
/**
* Constructor.
*
* @param manager Handshake manager.
+ * @param messageListener Message listener.
+ * @param serializationService Serialization service.
*/
- public HandshakeHandler(HandshakeManager manager, Function<String, ChannelInboundHandlerAdapter> newHandlerCreator) {
+ public HandshakeHandler(HandshakeManager manager, Consumer<InNetworkObject> messageListener,
+ PerSessionSerializationService serializationService) {
this.manager = manager;
- this.newHandlerCreator = newHandlerCreator;
+ this.messageListener = messageListener;
+ this.serializationService = serializationService;
}
/** {@inheritDoc} */
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
- HandshakeResult handshakeResult = manager.init(ctx.channel());
-
- handleHandshakeAction(handshakeResult, ctx);
+ manager.onInit(ctx);
}
/** {@inheritDoc} */
@Override
public void channelActive(ChannelHandlerContext ctx) {
- HandshakeResult handshakeResult = manager.onConnectionOpen(ctx.channel());
+ manager.onConnectionOpen();
manager.handshakeFuture().whenComplete((unused, throwable) -> {
if (throwable != null) {
@@ -71,18 +76,13 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
}
});
- handleHandshakeAction(handshakeResult, ctx);
-
ctx.fireChannelActive();
}
/** {@inheritDoc} */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
- HandshakeResult handshakeResult = manager.onMessage(ctx.channel(), (NetworkMessage) msg);
-
- handleHandshakeAction(handshakeResult, ctx);
- // No need to forward the message to the next handler as this message only matters for a handshake.
+ manager.onMessage((NetworkMessage) msg);
}
/** {@inheritDoc} */
@@ -104,29 +104,12 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
}
/**
- * Handle {@link HandshakeAction}.
+ * Creates a {@link MessageHandler} for the current pipeline.
*
- * @param result Handshake result.
- * @param ctx Netty channel context.
+ * @param remoteConsistentId Remote node's consistent id.
+ * @return Message handler.
*/
- private void handleHandshakeAction(HandshakeResult result, ChannelHandlerContext ctx) {
- HandshakeAction action = result.action();
- switch (action) {
- case REMOVE_HANDLER:
- ChannelInboundHandlerAdapter newHandler = newHandlerCreator.apply(result.consistentId());
- String handlerName = newHandler.getClass().getName();
- ctx.pipeline().replace(this, handlerName, newHandler);
- break;
-
- case FAIL:
- ctx.channel().close();
- break;
-
- case NOOP:
- break;
-
- default:
- break;
- }
+ public MessageHandler createMessageHandler(String remoteConsistentId) {
+ return new MessageHandler(messageListener, remoteConsistentId, serializationService);
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index cd1bc2b5d..b609f1fcb 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -37,6 +37,9 @@ import org.apache.ignite.network.serialization.MessageReader;
* Decodes {@link ByteBuf}s into {@link NetworkMessage}s.
*/
public class InboundDecoder extends ByteToMessageDecoder {
+ /** Handler name. */
+ public static final String NAME = "inbound-decoder";
+
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(InboundDecoder.class);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
new file mode 100644
index 000000000..43b6685bb
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+ /** Handler name. */
+ public static final String NAME = "inbound-recovery-handler";
+
+ /** Recovery descriptor. */
+ private final RecoveryDescriptor descriptor;
+
+ /** Messages factory. */
+ private final NetworkMessagesFactory factory;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Recovery descriptor.
+ * @param factory Message factory.
+ */
+ public InboundRecoveryHandler(RecoveryDescriptor descriptor, NetworkMessagesFactory factory) {
+ this.descriptor = descriptor;
+ this.factory = factory;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ NetworkMessage message = (NetworkMessage) msg;
+
+ if (message instanceof AcknowledgementMessage) {
+ AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+ long receivedMessages = ackMessage.receivedMessages();
+
+ descriptor.acknowledge(receivedMessages);
+ } else if (message.needAck()) {
+ AcknowledgementMessage ackMsg = factory.acknowledgementMessage()
+ .receivedMessages(descriptor.onReceive()).build();
+
+ ctx.channel().writeAndFlush(new OutNetworkObject(ackMsg, Collections.emptyList(), false));
+ }
+
+ super.channelRead(ctx, message);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/IoExceptionSuppressingHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/IoExceptionSuppressingHandler.java
index 5cbe57e6a..0ebd8ea50 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/IoExceptionSuppressingHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/IoExceptionSuppressingHandler.java
@@ -26,6 +26,9 @@ import org.apache.ignite.lang.IgniteLogger;
* Netty handler for suppressing IO exceptions that can happen if a remote peer abruptly closes the connection.
*/
class IoExceptionSuppressingHandler extends ChannelInboundHandlerAdapter {
+ /** Handler name. */
+ public static final String NAME = "io-exception-suppressing-handler";
+
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(IoExceptionSuppressingHandler.class);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
index 0cc13f7a1..164c0b475 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.network.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.function.Consumer;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.network.NetworkMessage;
@@ -27,6 +28,9 @@ import org.apache.ignite.network.NetworkMessage;
* Network message handler that delegates handling to {@link #messageListener}.
*/
public class MessageHandler extends ChannelInboundHandlerAdapter {
+ /** Handler name. */
+ public static final String NAME = "message-handler";
+
/** Message listener. */
private final Consumer<InNetworkObject> messageListener;
@@ -54,6 +58,10 @@ public class MessageHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
NetworkMessage message = (NetworkMessage) msg;
+ if (message instanceof AcknowledgementMessage) {
+ return;
+ }
+
messageListener.accept(new InNetworkObject(message, consistentId, serializationService.compositeDescriptorRegistry()));
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index 7983e4001..fb2698de8 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -21,7 +21,6 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -110,15 +109,7 @@ public class NettyClient {
public void initChannel(SocketChannel ch) {
var sessionSerializationService = new PerSessionSerializationService(serializationService);
- ch.pipeline().addLast(
- new InboundDecoder(sessionSerializationService),
- new HandshakeHandler(handshakeManager,
- (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
- ),
- new ChunkedWriteHandler(),
- new OutboundEncoder(sessionSerializationService),
- new IoExceptionSuppressingHandler()
- );
+ PipelineUtils.setup(ch.pipeline(), sessionSerializationService, handshakeManager, messageListener);
}
});
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 7050d8e7b..1a2b8a0cd 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -23,7 +23,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -130,25 +129,7 @@ public class NettyServer {
// Get handshake manager for the new channel.
HandshakeManager manager = handshakeManager.get();
- ch.pipeline().addLast(
- /*
- * Decoder that uses the MessageReader
- * to read chunked data.
- */
- new InboundDecoder(sessionSerializationService),
- // Handshake handler.
- new HandshakeHandler(manager,
- (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
- ),
- /*
- * Encoder that uses the MessageWriter
- * to write chunked data.
- */
- new ChunkedWriteHandler(),
- // Converts NetworkMessage to a ChunkedNetworkMessageInput
- new OutboundEncoder(sessionSerializationService),
- new IoExceptionSuppressingHandler()
- );
+ PipelineUtils.setup(ch.pipeline(), sessionSerializationService, manager, messageListener);
manager.handshakeFuture().thenAccept(newConnectionListener);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index f31b4d754..137d6dc6d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -38,6 +38,9 @@ import org.apache.ignite.network.serialization.MessageSerializer;
* An encoder for the outbound messages that uses {@link DirectMessageWriter}.
*/
public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> {
+ /** Handler name. */
+ public static final String NAME = "outbound-encoder";
+
private static final NetworkMessagesFactory MSG_FACTORY = new NetworkMessagesFactory();
/** Serialization registry. */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java
new file mode 100644
index 000000000..7a4037863
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.network.OutNetworkObject;
+
+/** Outbound handler that adds outgoing message to the recovery descriptor. */
+public class OutboundRecoveryHandler extends ChannelOutboundHandlerAdapter {
+ /** Handler name. */
+ public static final String NAME = "outbound-recovery-handler";
+
+ /** Recovery descriptor. */
+ private final RecoveryDescriptor descriptor;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Recovery descriptor.
+ */
+ public OutboundRecoveryHandler(RecoveryDescriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ OutNetworkObject outNetworkObject = (OutNetworkObject) msg;
+
+ if (outNetworkObject.shouldBeSavedForRecovery()) {
+ descriptor.add(outNetworkObject);
+ }
+
+ super.write(ctx, msg, promise);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
new file mode 100644
index 000000000..6c2ba6799
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+
+/** Pipeline utils. */
+public class PipelineUtils {
+ /** {@link ChunkedWriteHandler}'s name. */
+ private static final String CHUNKED_WRITE_HANDLER_NAME = "chunked-write-handler";
+
+ /**
+ * Sets up initial pipeline.
+ *
+ * @param pipeline Channel pipeline.
+ * @param serializationService Serialization service.
+ * @param handshakeManager Handshake manager.
+ * @param messageListener Message listener.
+ */
+ public static void setup(ChannelPipeline pipeline, PerSessionSerializationService serializationService,
+ HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) {
+ pipeline.addLast(InboundDecoder.NAME, new InboundDecoder(serializationService));
+ pipeline.addLast(HandshakeHandler.NAME, new HandshakeHandler(handshakeManager, messageListener, serializationService));
+ pipeline.addLast(CHUNKED_WRITE_HANDLER_NAME, new ChunkedWriteHandler());
+ pipeline.addLast(OutboundEncoder.NAME, new OutboundEncoder(serializationService));
+ pipeline.addLast(IoExceptionSuppressingHandler.NAME, new IoExceptionSuppressingHandler());
+ }
+
+ /**
+ * Changes pipeline after the handshake.
+ *
+ * @param pipeline Pipeline.
+ * @param descriptor Recovery descriptor.
+ * @param messageHandler Message handler.
+ * @param factory Message factory.
+ */
+ public static void afterHandshake(
+ ChannelPipeline pipeline,
+ RecoveryDescriptor descriptor,
+ MessageHandler messageHandler,
+ NetworkMessagesFactory factory
+ ) {
+ pipeline.addAfter(OutboundEncoder.NAME, OutboundRecoveryHandler.NAME, new OutboundRecoveryHandler(descriptor));
+ pipeline.addBefore(HandshakeHandler.NAME, InboundRecoveryHandler.NAME, new InboundRecoveryHandler(descriptor, factory));
+ pipeline.addAfter(HandshakeHandler.NAME, MessageHandler.NAME, messageHandler);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 2a5cf7a48..c4d030bb7 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -19,19 +19,25 @@ package org.apache.ignite.internal.network.recovery;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
+import org.apache.ignite.internal.network.netty.PipelineUtils;
+import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
+import org.jetbrains.annotations.TestOnly;
/**
* Recovery protocol handshake manager for a client.
@@ -43,63 +49,113 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
/** Consistent id. */
private final String consistentId;
+ /** Message factory. */
+ private final NetworkMessagesFactory messageFactory;
+
+ /** Recovery descriptor provider. */
+ private final RecoveryDescriptorProvider recoveryDescriptorProvider;
+
+ /** Connection id. */
+ private final short connectionId;
+
/** Handshake completion future. */
private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
- /** Message factory. */
- private final NetworkMessagesFactory messageFactory;
+ /** Remote node's launch id. */
+ private UUID remoteLaunchId;
+
+ /** Remote node's consistent id. */
+ private String remoteConsistentId;
+
+ /** Netty pipeline channel handler context. */
+ private ChannelHandlerContext ctx;
+
+ /** Channel. */
+ private Channel channel;
+
+ /** Netty pipeline handshake handler. */
+ private HandshakeHandler handler;
+
+ /** Recovery descriptor. */
+ private RecoveryDescriptor recoveryDescriptor;
/**
* Constructor.
*
- * @param launchId Launch id.
- * @param consistentId Consistent id.
+ * @param launchId Launch id.
+ * @param consistentId Consistent id.
* @param messageFactory Message factory.
+ * @param recoveryDescriptorProvider Recovery descriptor provider.
*/
public RecoveryClientHandshakeManager(
- UUID launchId, String consistentId, NetworkMessagesFactory messageFactory
- ) {
+ UUID launchId, String consistentId, short connectionId, NetworkMessagesFactory messageFactory,
+ RecoveryDescriptorProvider recoveryDescriptorProvider) {
this.launchId = launchId;
this.consistentId = consistentId;
+ this.connectionId = connectionId;
this.messageFactory = messageFactory;
+ this.recoveryDescriptorProvider = recoveryDescriptorProvider;
}
/** {@inheritDoc} */
@Override
- public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+ public void onInit(ChannelHandlerContext handlerContext) {
+ this.ctx = handlerContext;
+ this.channel = handlerContext.channel();
+ this.handler = (HandshakeHandler) ctx.handler();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onMessage(NetworkMessage message) {
if (message instanceof HandshakeStartMessage) {
HandshakeStartMessage msg = (HandshakeStartMessage) message;
- UUID remoteLaunchId = msg.launchId();
- String remoteConsistentId = msg.consistentId();
+ this.remoteLaunchId = msg.launchId();
+ this.remoteConsistentId = msg.consistentId();
- HandshakeStartResponseMessage response = messageFactory.handshakeStartResponseMessage()
- .launchId(launchId)
- .consistentId(consistentId)
- .receivedCount(0)
- .connectionsCount(0)
- .build();
+ this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId, connectionId,
+ false);
- ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(response, Collections.emptyList()));
+ handshake(recoveryDescriptor);
- NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
- if (throwable != null) {
- handshakeCompleteFuture.completeExceptionally(
- new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable)
- );
- } else {
- handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
- }
- });
+ return;
+ }
+
+ assert recoveryDescriptor != null : "Wrong client handshake flow";
+
+ if (message instanceof HandshakeFinishMessage) {
+ HandshakeFinishMessage msg = (HandshakeFinishMessage) message;
+ long receivedCount = msg.receivedCount();
+
+ recoveryDescriptor.acknowledge(receivedCount);
+
+ int cnt = recoveryDescriptor.unacknowledgedCount();
+
+ if (cnt == 0) {
+ finishHandshake();
+
+ return;
+ }
- return HandshakeResult.removeHandler(remoteLaunchId, remoteConsistentId);
+ List<OutNetworkObject> networkMessages = recoveryDescriptor.unacknowledgedMessages();
+
+ for (OutNetworkObject networkMessage : networkMessages) {
+ channel.write(networkMessage);
+ }
+
+ channel.flush();
+
+ return;
}
- handshakeCompleteFuture.completeExceptionally(
- new HandshakeException("Unexpected message during handshake: " + message.toString())
- );
+ int cnt = recoveryDescriptor.unacknowledgedCount();
- return HandshakeResult.fail();
+ if (cnt == 0) {
+ finishHandshake();
+ }
+
+ ctx.fireChannelRead(message);
}
/** {@inheritDoc} */
@@ -108,15 +164,48 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
return handshakeCompleteFuture;
}
- /** {@inheritDoc} */
- @Override
- public HandshakeResult init(Channel channel) {
- return HandshakeResult.noOp();
+ private void handshake(RecoveryDescriptor descriptor) {
+ PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+
+ HandshakeStartResponseMessage response = messageFactory.handshakeStartResponseMessage()
+ .launchId(launchId)
+ .consistentId(consistentId)
+ .receivedCount(descriptor.receivedCount())
+ .connectionId(connectionId)
+ .build();
+
+ ChannelFuture sendFuture = ctx.channel().writeAndFlush(new OutNetworkObject(response, Collections.emptyList(), false));
+
+ NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ handshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable)
+ );
+ }
+ });
}
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onConnectionOpen(Channel channel) {
- return HandshakeResult.noOp();
+ /**
+ * Creates a message handler using the consistent id of a remote node.
+ *
+ * @return New message handler.
+ */
+ private MessageHandler createMessageHandler() {
+ return handler.createMessageHandler(remoteConsistentId);
+ }
+
+ /**
+ * Finishes handshaking process by removing handshake handler from the pipeline and creating a {@link NettySender}.
+ */
+ private void finishHandshake() {
+ // Removes handshake handler from the pipeline as the handshake is finished
+ this.ctx.pipeline().remove(this.handler);
+
+ handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+ }
+
+ @TestOnly
+ public RecoveryDescriptor recoveryDescriptor() {
+ return recoveryDescriptor;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
new file mode 100644
index 000000000..b8a73a0a8
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Recovery protocol descriptor.
+ */
+public class RecoveryDescriptor {
+ /** Unacknowledged messages. */
+ private final Queue<OutNetworkObject> unacknowledgedMessages;
+
+ /** Count of sent messages. */
+ private long sentCount;
+
+ /** Count of acknowledged sent messages. */
+ private long acknowledgedCount;
+
+ /** Count of received messages. */
+ private long receivedCount;
+
+ /**
+ * Constructor.
+ *
+ * @param queueLimit Maximum size of unacknowledged messages queue.
+ */
+ public RecoveryDescriptor(int queueLimit) {
+ this.unacknowledgedMessages = new ArrayDeque<>(queueLimit);
+ }
+
+ /**
+ * Returns count of received messages.
+ *
+ * @return Count of received messages.
+ */
+ public long receivedCount() {
+ return receivedCount;
+ }
+
+ /**
+ * Acknowledges that sent messages were received by the remote node.
+ *
+ * @param messagesReceivedByRemote Number of all messages received by the remote node.
+ */
+ public void acknowledge(long messagesReceivedByRemote) {
+ while (acknowledgedCount < messagesReceivedByRemote) {
+ OutNetworkObject req = unacknowledgedMessages.poll();
+
+ assert req != null;
+
+ acknowledgedCount++;
+ }
+ }
+
+ /**
+ * Returns the number of the messages unacknowledged by the remote node.
+ *
+ * @return The number of the messages unacknowledged by the remote node.
+ */
+ public int unacknowledgedCount() {
+ long res = sentCount - acknowledgedCount;
+ int size = unacknowledgedMessages.size();
+
+ assert res >= 0;
+ assert res == size;
+
+ return size;
+ }
+
+ /**
+ * Returns unacknowledged messages.
+ *
+ * @return Unacknowledged messages.
+ */
+ public List<OutNetworkObject> unacknowledgedMessages() {
+ return new ArrayList<>(unacknowledgedMessages);
+ }
+
+ /**
+ * Adds a sent message.
+ *
+ * @param msg Message.
+ */
+ public void add(OutNetworkObject msg) {
+ msg.shouldBeSavedForRecovery(false);
+ sentCount++;
+ unacknowledgedMessages.add(msg);
+ }
+
+ /**
+ * Handles the event of receiving a new message.
+ *
+ * @return Number of received messages.
+ */
+ public long onReceive() {
+ receivedCount++;
+
+ return receivedCount;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return S.toString(RecoveryDescriptor.class, this);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
similarity index 58%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
index fc429d7cc..ff3f20406 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
@@ -15,29 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.recovery.message;
+package org.apache.ignite.internal.network.recovery;
import java.util.UUID;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
/**
- * Handshake start message.
+ * Interface that provides recovery descriptors for channels between two nodes.
*/
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
+public interface RecoveryDescriptorProvider {
/**
- * Returns launch id.
+ * Returns a {@link RecoveryDescriptor}.
*
- * @return Launch id.
+ * @param consistentId Remote node consistent id.
+ * @param launchId Remote node launch id.
+ * @param connectionIndex Connection id.
+ * @param inbound {@code true} if the connection is inbound, {@code false} otherwise.
+ * @return Recovery descriptor.
*/
- UUID launchId();
-
- /**
- * Returns consistent id.
- *
- * @return Consistent id.
- */
- String consistentId();
+ RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID launchId, short connectionIndex, boolean inbound);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index cfe546ecb..17c516817 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -19,19 +19,25 @@ package org.apache.ignite.internal.network.recovery;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
+import org.apache.ignite.internal.network.netty.PipelineUtils;
+import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
+import org.jetbrains.annotations.TestOnly;
/**
* Recovery protocol handshake manager for a server.
@@ -43,42 +49,70 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
/** Consistent id. */
private final String consistentId;
+ /** Message factory. */
+ private final NetworkMessagesFactory messageFactory;
+
/** Handshake completion future. */
private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
- /** Message factory. */
- private final NetworkMessagesFactory messageFactory;
+ /** Remote node's launch id. */
+ private UUID remoteLaunchId;
+
+ /** Remote node's consistent id. */
+ private String remoteConsistentId;
+
+ /** Netty pipeline channel handler context. */
+ private ChannelHandlerContext ctx;
+
+ /** Channel. */
+ private Channel channel;
+
+ /** Netty pipeline handshake handler. */
+ private HandshakeHandler handler;
+
+ /** Count of messages received by the remote node. */
+ private long receivedCount;
+
+ /** Recovery descriptor provider. */
+ private final RecoveryDescriptorProvider recoveryDescriptorProvider;
+
+ /** Recovery descriptor. */
+ private RecoveryDescriptor recoveryDescriptor;
/**
* Constructor.
*
- * @param launchId Launch id.
- * @param consistentId Consistent id.
+ * @param launchId Launch id.
+ * @param consistentId Consistent id.
* @param messageFactory Message factory.
+ * @param recoveryDescriptorProvider Recovery descriptor provider.
*/
public RecoveryServerHandshakeManager(
- UUID launchId, String consistentId, NetworkMessagesFactory messageFactory
- ) {
+ UUID launchId, String consistentId, NetworkMessagesFactory messageFactory,
+ RecoveryDescriptorProvider recoveryDescriptorProvider) {
this.launchId = launchId;
this.consistentId = consistentId;
this.messageFactory = messageFactory;
+ this.recoveryDescriptorProvider = recoveryDescriptorProvider;
}
/** {@inheritDoc} */
@Override
- public HandshakeResult init(Channel channel) {
- return HandshakeResult.noOp();
+ public void onInit(ChannelHandlerContext handlerContext) {
+ this.ctx = handlerContext;
+ this.channel = handlerContext.channel();
+ this.handler = (HandshakeHandler) ctx.handler();
}
/** {@inheritDoc} */
@Override
- public HandshakeResult onConnectionOpen(Channel channel) {
+ public void onConnectionOpen() {
HandshakeStartMessage handshakeStartMessage = messageFactory.handshakeStartMessage()
.launchId(launchId)
.consistentId(consistentId)
.build();
- ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(handshakeStartMessage, Collections.emptyList()));
+ ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(handshakeStartMessage, Collections.emptyList(), false));
NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
if (throwable != null) {
@@ -87,29 +121,86 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
);
}
});
-
- return HandshakeResult.noOp();
}
/** {@inheritDoc} */
@Override
- public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+ public void onMessage(NetworkMessage message) {
if (message instanceof HandshakeStartResponseMessage) {
HandshakeStartResponseMessage msg = (HandshakeStartResponseMessage) message;
- UUID remoteLaunchId = msg.launchId();
- String remoteConsistentId = msg.consistentId();
+ this.remoteLaunchId = msg.launchId();
+ this.remoteConsistentId = msg.consistentId();
+ this.receivedCount = msg.receivedCount();
- handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+ this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId,
+ msg.connectionId(), true);
- return HandshakeResult.removeHandler(remoteLaunchId, remoteConsistentId);
+ handshake(recoveryDescriptor);
+
+ return;
+ }
+
+ assert recoveryDescriptor != null : "Wrong server handshake flow";
+
+ if (recoveryDescriptor.unacknowledgedCount() == 0) {
+ finishHandshake();
}
- handshakeCompleteFuture.completeExceptionally(
- new HandshakeException("Unexpected message during handshake: " + message.toString())
+ ctx.fireChannelRead(message);
+ }
+
+ private void handshake(RecoveryDescriptor descriptor) {
+ PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+
+ HandshakeFinishMessage response = messageFactory.handshakeFinishMessage()
+ .receivedCount(descriptor.receivedCount())
+ .build();
+
+ CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
+ channel.write(new OutNetworkObject(response, Collections.emptyList(), false))
);
- return HandshakeResult.fail();
+ descriptor.acknowledge(receivedCount);
+
+ int unacknowledgedCount = descriptor.unacknowledgedCount();
+
+ if (unacknowledgedCount > 0) {
+ var futs = new CompletableFuture[unacknowledgedCount + 1];
+ futs[0] = sendFuture;
+
+ List<OutNetworkObject> networkMessages = descriptor.unacknowledgedMessages();
+
+ for (int i = 0; i < networkMessages.size(); i++) {
+ OutNetworkObject networkMessage = networkMessages.get(i);
+ futs[i + 1] = NettyUtils.toCompletableFuture(channel.write(networkMessage));
+ }
+
+ sendFuture = CompletableFuture.allOf(futs);
+ }
+
+ channel.flush();
+
+ boolean hasUnacknowledgedMessages = unacknowledgedCount > 0;
+
+ sendFuture.whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ handshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable)
+ );
+ } else if (!hasUnacknowledgedMessages) {
+ finishHandshake();
+ }
+ });
+ }
+
+ /**
+ * Creates a message handler using the consistent id of a remote node.
+ *
+ * @return New message handler.
+ */
+ private MessageHandler createMessageHandler() {
+ return handler.createMessageHandler(remoteConsistentId);
}
/** {@inheritDoc} */
@@ -117,4 +208,19 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
public CompletableFuture<NettySender> handshakeFuture() {
return handshakeCompleteFuture;
}
+
+ /**
+ * Finishes handshaking process by removing handshake handler from the pipeline and creating a {@link NettySender}.
+ */
+ private void finishHandshake() {
+ // Removes handshake handler from the pipeline as the handshake is finished
+ this.ctx.pipeline().remove(this.handler);
+
+ handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+ }
+
+ @TestOnly
+ public RecoveryDescriptor recoveryDescriptor() {
+ return recoveryDescriptor;
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/AcknowledgementMessage.java
similarity index 67%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/AcknowledgementMessage.java
index fc429d7cc..091ad0e1f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/AcknowledgementMessage.java
@@ -17,27 +17,19 @@
package org.apache.ignite.internal.network.recovery.message;
-import java.util.UUID;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
+import static org.apache.ignite.internal.network.NetworkMessageTypes.ACKNOWLEDGEMENT;
+
import org.apache.ignite.network.annotations.Transferable;
/**
- * Handshake start message.
+ * Message that holds count of messages received by node.
*/
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
- /**
- * Returns launch id.
- *
- * @return Launch id.
- */
- UUID launchId();
-
+@Transferable(ACKNOWLEDGEMENT)
+public interface AcknowledgementMessage extends InternalMessage {
/**
- * Returns consistent id.
+ * Returns count of messages received within specific channel between two nodes.
*
- * @return Consistent id.
+ * @return count of messages received within specific channel between two nodes.
*/
- String consistentId();
+ long receivedMessages();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeFinishMessage.java
similarity index 66%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeFinishMessage.java
index fc429d7cc..f93905ed6 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeFinishMessage.java
@@ -17,27 +17,20 @@
package org.apache.ignite.internal.network.recovery.message;
-import java.util.UUID;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
+import static org.apache.ignite.internal.network.NetworkMessageTypes.HANDSHAKE_FINISH;
+
import org.apache.ignite.network.annotations.Transferable;
/**
- * Handshake start message.
+ * Handshake finish message, contains the quantity of the received messages.
+ * This message is sent from a server to a client as a response to the {@link HandshakeStartResponseMessage}.
*/
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
- /**
- * Returns launch id.
- *
- * @return Launch id.
- */
- UUID launchId();
-
+@Transferable(HANDSHAKE_FINISH)
+public interface HandshakeFinishMessage extends InternalMessage {
/**
- * Returns consistent id.
+ * Returns number of received messages.
*
- * @return Consistent id.
+ * @return Number of received messages.
*/
- String consistentId();
+ long receivedCount();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
index fc429d7cc..e021516f1 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
@@ -19,14 +19,14 @@ package org.apache.ignite.internal.network.recovery.message;
import java.util.UUID;
import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
- * Handshake start message.
+ * Handshake start message, contains info about the node.
+ * This message is sent from a server to a client at the connection opening.
*/
@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
+public interface HandshakeStartMessage extends InternalMessage {
/**
* Returns launch id.
*
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
index 77e520758..833ec48d4 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
@@ -19,14 +19,14 @@ package org.apache.ignite.internal.network.recovery.message;
import java.util.UUID;
import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
- * Handshake start response message.
+ * Handshake start response message, contains info about the node, connection id and the quantity of the received messages.
+ * This message is sent from a client to a server as a response to the {@link HandshakeStartMessage}.
*/
@Transferable(NetworkMessageTypes.HANDSHAKE_START_RESPONSE)
-public interface HandshakeStartResponseMessage extends NetworkMessage {
+public interface HandshakeStartResponseMessage extends InternalMessage {
/**
* Returns launch id.
*
@@ -49,9 +49,9 @@ public interface HandshakeStartResponseMessage extends NetworkMessage {
long receivedCount();
/**
- * Returns connections count.
+ * Returns connection id.
*
- * @return Connections count.
+ * @return Connection id.
*/
- long connectionsCount();
+ short connectionId();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/InternalMessage.java
similarity index 65%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/InternalMessage.java
index fc429d7cc..2d82a1e8b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/InternalMessage.java
@@ -17,27 +17,15 @@
package org.apache.ignite.internal.network.recovery.message;
-import java.util.UUID;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
/**
- * Handshake start message.
+ * Message that does not need an acknowledgement from the remote node.
*/
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
- /**
- * Returns launch id.
- *
- * @return Launch id.
- */
- UUID launchId();
-
- /**
- * Returns consistent id.
- *
- * @return Consistent id.
- */
- String consistentId();
+public interface InternalMessage extends NetworkMessage {
+ /** {@inheritDoc} */
+ @Override
+ default boolean needAck() {
+ return false;
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/OutNetworkObject.java b/modules/network/src/main/java/org/apache/ignite/network/OutNetworkObject.java
index f2e9499b7..786a3fe7b 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/OutNetworkObject.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/OutNetworkObject.java
@@ -30,6 +30,13 @@ public class OutNetworkObject {
/** List of class descriptor messages. */
private final List<ClassDescriptorMessage> descriptors;
+ /**
+ * Flag indicating if this outgoing message should be added to the unacknowledged messages queue of the recovery descriptor.
+ * Acknowledgement message and handshake messages should not have this flag set to {@code true}. After adding the message
+ * to the queue this flag should be set to {@code false}.
+ */
+ private boolean shouldBeSavedForRecovery;
+
/**
* Constructor.
*
@@ -37,8 +44,28 @@ public class OutNetworkObject {
* @param descriptors Class descriptors.
*/
public OutNetworkObject(NetworkMessage networkMessage, List<ClassDescriptorMessage> descriptors) {
+ this(networkMessage, descriptors, true);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param networkMessage Network message.
+ * @param descriptors Class descriptors.
+ * @param shouldBeSavedForRecovery See {@link #shouldBeSavedForRecovery}.
+ */
+ public OutNetworkObject(NetworkMessage networkMessage, List<ClassDescriptorMessage> descriptors, boolean shouldBeSavedForRecovery) {
this.networkMessage = networkMessage;
this.descriptors = descriptors;
+ this.shouldBeSavedForRecovery = shouldBeSavedForRecovery;
+ }
+
+ public boolean shouldBeSavedForRecovery() {
+ return shouldBeSavedForRecovery;
+ }
+
+ public void shouldBeSavedForRecovery(boolean shouldBeSavedForRecovery) {
+ this.shouldBeSavedForRecovery = shouldBeSavedForRecovery;
}
public NetworkMessage networkMessage() {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index df5ff973a..7daf90272 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -35,8 +35,6 @@ import org.apache.ignite.configuration.schemas.network.NetworkView;
import org.apache.ignite.configuration.schemas.network.ScaleCubeView;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.netty.ConnectionManager;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
-import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.SerializationService;
@@ -103,9 +101,8 @@ public class ScaleCubeClusterServiceFactory {
connectionMgr = new ConnectionManager(
configView,
serializationService,
+ launchId,
consistentId,
- () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
- () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
nettyBootstrapFactory
);
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index 54ab77605..95dd327d1 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -34,7 +35,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NetworkMessage;
import org.junit.jupiter.api.AfterEach;
@@ -248,8 +248,8 @@ public class NettyClientTest {
/** {@inheritDoc} */
@Override
- public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
- return HandshakeResult.removeHandler(null, null);
+ public void onMessage(NetworkMessage message) {
+ // No-op.
}
/** {@inheritDoc} */
@@ -260,14 +260,8 @@ public class NettyClientTest {
/** {@inheritDoc} */
@Override
- public HandshakeResult init(Channel channel) {
- return HandshakeResult.noOp();
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onConnectionOpen(Channel channel) {
- return HandshakeResult.noOp();
+ public void onInit(ChannelHandlerContext ctx) {
+ // No-op.
}
}
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index 4e84a8980..f07fb5395 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -42,7 +42,6 @@ import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.lang.IgniteInternalException;
@@ -147,10 +146,6 @@ public class NettyServerTest {
HandshakeManager handshakeManager = mock(HandshakeManager.class);
when(handshakeManager.handshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class)));
- HandshakeResult noOp = HandshakeResult.noOp();
- when(handshakeManager.init(any())).thenReturn(noOp);
- when(handshakeManager.onConnectionOpen(any())).thenReturn(noOp);
- when(handshakeManager.onMessage(any(), any())).thenReturn(noOp);
MessageSerializationRegistry registry = mock(MessageSerializationRegistry.class);
@@ -220,10 +215,10 @@ public class NettyServerTest {
InOrder order = Mockito.inOrder(handshakeManager);
- order.verify(handshakeManager, timeout()).init(any());
+ order.verify(handshakeManager, timeout()).onInit(any());
order.verify(handshakeManager, timeout()).handshakeFuture();
- order.verify(handshakeManager, timeout()).onConnectionOpen(any());
- order.verify(handshakeManager, timeout()).onMessage(any(), any());
+ order.verify(handshakeManager, timeout()).onConnectionOpen();
+ order.verify(handshakeManager, timeout()).onMessage(any());
}
/**
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
new file mode 100644
index 000000000..e83384a2f
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Recovery protocol handshake flow test.
+ */
+public class RecoveryHandshakeTest {
+ /** Connection id. */
+ private static final short CONNECTION_ID = 1337;
+
+ /** Serialization registry. */
+ private static final MessageSerializationRegistry MESSAGE_REGISTRY = new TestMessageSerializationRegistryImpl();
+
+ /** Message factory. */
+ private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
+
+ /** Test message factory. */
+ private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new TestMessagesFactory();
+
+ @Test
+ public void testHandshake() throws Exception {
+ RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+ RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+ RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+ RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+ EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+ EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+ assertTrue(serverSideChannel.isActive());
+
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+ assertNull(clientSideChannel.readOutbound());
+ assertNull(serverSideChannel.readOutbound());
+
+ checkHandshakeCompleted(serverHandshakeManager);
+ checkHandshakeCompleted(clientHandshakeManager);
+
+ checkPipelineAfterHandshake(serverSideChannel);
+ checkPipelineAfterHandshake(clientSideChannel);
+ }
+
+ @Test
+ public void testHandshakeWithUnacknowledgedServerMessage() throws Exception {
+ RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+ RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+ UUID clientLaunchId = UUID.randomUUID();
+ RecoveryDescriptor serverRecoveryDescriptor = serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID, true);
+ addUnacknowledgedMessages(serverRecoveryDescriptor);
+
+ RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager("client", clientLaunchId,
+ clientRecovery);
+ RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+ var messageCaptor = new AtomicReference<TestMessage>();
+ EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, (inObject) -> {
+ NetworkMessage msg = inObject.message();
+
+ assertInstanceOf(TestMessage.class, msg);
+
+ messageCaptor.set((TestMessage) msg);
+ });
+
+ EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+ assertTrue(serverSideChannel.isActive());
+
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+ assertNull(clientSideChannel.readOutbound());
+
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ assertNull(serverSideChannel.readOutbound());
+
+ TestMessage ackedMessage = messageCaptor.get();
+ assertNotNull(ackedMessage);
+
+ checkHandshakeNotCompleted(serverHandshakeManager);
+ checkHandshakeCompleted(clientHandshakeManager);
+
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+
+ checkHandshakeCompleted(serverHandshakeManager);
+ checkHandshakeCompleted(clientHandshakeManager);
+
+ checkPipelineAfterHandshake(serverSideChannel);
+ checkPipelineAfterHandshake(clientSideChannel);
+ }
+
+ @Test
+ public void testHandshakeWithUnacknowledgedClientMessage() throws Exception {
+ RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+ RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+ UUID serverLaunchId = UUID.randomUUID();
+ RecoveryDescriptor clientRecoveryDescriptor = clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID, false);
+ addUnacknowledgedMessages(clientRecoveryDescriptor);
+
+ RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+ RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager("server", serverLaunchId,
+ serverRecovery);
+
+ var messageCaptor = new AtomicReference<TestMessage>();
+ EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+ EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, (inObject) -> {
+ NetworkMessage msg = inObject.message();
+
+ assertInstanceOf(TestMessage.class, msg);
+
+ messageCaptor.set((TestMessage) msg);
+ });
+
+ assertTrue(serverSideChannel.isActive());
+
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+ assertNull(serverSideChannel.readOutbound());
+
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ assertNull(clientSideChannel.readOutbound());
+
+ TestMessage ackedMessage = messageCaptor.get();
+ assertNotNull(ackedMessage);
+
+ checkHandshakeCompleted(serverHandshakeManager);
+ checkHandshakeNotCompleted(clientHandshakeManager);
+
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+ checkHandshakeCompleted(serverHandshakeManager);
+ checkHandshakeCompleted(clientHandshakeManager);
+
+ checkPipelineAfterHandshake(serverSideChannel);
+ checkPipelineAfterHandshake(clientSideChannel);
+ }
+
+ @Test
+ public void testPairedRecoveryDescriptors() throws Exception {
+ RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider();
+ RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider();
+
+ UUID node1Uuid = UUID.randomUUID();
+ UUID node2Uuid = UUID.randomUUID();
+
+ RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
+ RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+
+ RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
+ RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+
+ EmbeddedChannel out1to2 = setupChannel(chm1, noMessageListener);
+ EmbeddedChannel in1to2 = setupChannel(shm1, noMessageListener);
+ EmbeddedChannel out2to1 = setupChannel(chm2, noMessageListener);
+ EmbeddedChannel in2to1 = setupChannel(shm2, noMessageListener);
+
+ exchangeServerToClient(in1to2, out2to1);
+ exchangeServerToClient(in2to1, out1to2);
+
+ exchangeClientToServer(in1to2, out2to1);
+ exchangeClientToServer(in2to1, out1to2);
+
+ assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
+ assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+ }
+
+ @Test
+ public void testExactlyOnceServer() throws Exception {
+ testExactlyOnce(true);
+ }
+
+ @Test
+ public void testExactlyOnceClient() throws Exception {
+ testExactlyOnce(false);
+ }
+
+ /**
+ * Tests that message was received exactly once in case if network failure during acknowledgement.
+ *
+ * @param serverDidntReceiveAck {@code true} if server didn't receive the acknowledgement, {@code false} if client didn't receive
+ * the acknowledgement.
+ * @throws Exception If failed.
+ */
+ private void testExactlyOnce(boolean serverDidntReceiveAck) throws Exception {
+ var server = "server";
+ UUID serverLaunchId = UUID.randomUUID();
+ var client = "client";
+ UUID clientLaunchId = UUID.randomUUID();
+
+ RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+ RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+ RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId,
+ clientRecovery);
+ RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId,
+ serverRecovery);
+
+ var receivedFirst = new AtomicBoolean();
+
+ var listener1 = new MessageListener("1", receivedFirst);
+
+ EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener1 : noMessageListener);
+ EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener1);
+
+ // Normal handshake
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+ EmbeddedChannel ch = serverDidntReceiveAck ? serverSideChannel : clientSideChannel;
+
+ // Add two messages to the outbound
+ ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("1").build(), Collections.emptyList()));
+ ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("2").build(), Collections.emptyList()));
+
+ // Send one of the messages
+ if (serverDidntReceiveAck) {
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ } else {
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ }
+
+ // Message should be received
+ assertTrue(receivedFirst.get());
+
+ // Transfer only one acknowledgement, don't transfer the second one (simulates network failure on acknowledgement)
+ if (serverDidntReceiveAck) {
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ } else {
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ }
+
+ // Simulate reconnection
+ clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
+ serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+
+ var receivedSecond = new AtomicBoolean();
+
+ var listener2 = new MessageListener("2", receivedSecond);
+
+ clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
+ serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
+
+ // Handshake
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+ // Resending message
+ if (serverDidntReceiveAck) {
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ } else {
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ }
+
+ // Send another acknowledgement
+ if (serverDidntReceiveAck) {
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ } else {
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ }
+
+ assertNull(serverSideChannel.readOutbound());
+ assertNull(clientSideChannel.readOutbound());
+
+ assertTrue(receivedSecond.get());
+ }
+
+ /** Message listener that accepts a specific message only once. */
+ private static class MessageListener implements Consumer<InNetworkObject> {
+ /** Expected message. */
+ private final String expectedMessage;
+
+ /** Flag indicating that expected messages was received. */
+ private final AtomicBoolean flag;
+
+ private MessageListener(String expectedMessage, AtomicBoolean flag) {
+ this.expectedMessage = expectedMessage;
+ this.flag = flag;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void accept(InNetworkObject inNetworkObject) {
+ TestMessage msg = (TestMessage) inNetworkObject.message();
+ if (expectedMessage.equals(msg.msg())) {
+ if (!flag.compareAndSet(false, true)) {
+ fail();
+ }
+ return;
+ }
+ fail();
+ }
+ }
+
+ private void checkPipelineAfterHandshake(EmbeddedChannel channel) {
+ assertNull(channel.pipeline().get(HandshakeHandler.NAME));
+ }
+
+ private void checkHandshakeNotCompleted(HandshakeManager manager) {
+ CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+ assertFalse(handshakeFuture.isDone());
+ assertFalse(handshakeFuture.isCompletedExceptionally());
+ assertFalse(handshakeFuture.isCancelled());
+ }
+
+ private void checkHandshakeCompleted(HandshakeManager manager) {
+ CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+ assertTrue(handshakeFuture.isDone());
+ assertFalse(handshakeFuture.isCompletedExceptionally());
+ assertFalse(handshakeFuture.isCancelled());
+ }
+
+ private void addUnacknowledgedMessages(RecoveryDescriptor recoveryDescriptor) {
+ TestMessage msg = TEST_MESSAGES_FACTORY.testMessage().msg("test").build();
+ recoveryDescriptor.add(new OutNetworkObject(msg, Collections.emptyList()));
+ }
+
+ private void exchangeServerToClient(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+ ByteBuf handshakeStartMessage = serverSideChannel.readOutbound();
+ clientSideChannel.writeInbound(handshakeStartMessage);
+ }
+
+ private void exchangeClientToServer(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+ ByteBuf handshakeStartMessage = clientSideChannel.readOutbound();
+ serverSideChannel.writeInbound(handshakeStartMessage);
+ }
+
+ private final Consumer<InNetworkObject> noMessageListener = inNetworkObject ->
+ fail("Received message while shouldn't have, [" + inNetworkObject.message() + "]");
+
+ private EmbeddedChannel setupChannel(HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) throws Exception {
+ // Channel should not be registered at first, not before we add pipeline handlers
+ // Otherwise, events like "channel active" won't be propagated to the handlers
+ var channel = new EmbeddedChannel(false, false);
+
+ var serializationService = new SerializationService(MESSAGE_REGISTRY, createUserObjectSerializationContext());
+ var sessionSerializationService = new PerSessionSerializationService(serializationService);
+
+ PipelineUtils.setup(channel.pipeline(), sessionSerializationService, handshakeManager, messageListener);
+
+ channel.register();
+
+ return channel;
+ }
+
+ private UserObjectSerializationContext createUserObjectSerializationContext() {
+ var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+ var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+ var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+
+ return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+ userObjectMarshaller);
+ }
+
+ private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(RecoveryDescriptorProvider provider) {
+ return createRecoveryClientHandshakeManager("client", UUID.randomUUID(), provider);
+ }
+
+ private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
+ RecoveryDescriptorProvider provider) {
+ return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, MESSAGE_FACTORY, provider);
+ }
+
+ private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
+ return createRecoveryServerHandshakeManager("server", UUID.randomUUID(), provider);
+ }
+
+ private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
+ RecoveryDescriptorProvider provider) {
+ return new RecoveryServerHandshakeManager(launchId, consistentId, MESSAGE_FACTORY, provider);
+ }
+
+ private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {
+ return new DefaultRecoveryDescriptorProvider();
+ }
+}