You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/02/28 10:52:16 UTC
[ignite-3] branch ignite-15655-tc updated: ..
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-15655-tc
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-15655-tc by this push:
new 76545b3 ..
76545b3 is described below
commit 76545b3de11359807c08e0f8689c92326367cdfa
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Mon Feb 28 13:52:07 2022 +0300
..
---
.../network/netty/ItConnectionManagerTest.java | 635 +++++++--------
.../network/recovery/ItRecoveryHandshakeTest.java | 851 ++++++++++-----------
.../internal/network/netty/Communication.java | 44 ++
.../network/netty/CommunicationHandler.java | 63 ++
.../internal/network/netty/CommunicationPool.java | 50 ++
.../internal/network/netty/ConnectionManager.java | 184 +----
.../ignite/network/DefaultMessagingService.java | 17 +-
.../scalecube/ScaleCubeClusterServiceFactory.java | 33 +-
.../ScaleCubeDirectMarshallerTransport.java | 35 +-
9 files changed, 908 insertions(+), 1004 deletions(-)
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 447089b..ebd8adf 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -17,48 +17,7 @@
package org.apache.ignite.internal.network.netty;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.anyShort;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import io.netty.handler.codec.DecoderException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
-import org.apache.ignite.configuration.schemas.network.NetworkView;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
-import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
-import org.apache.ignite.internal.network.serialization.SerializationService;
-import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.TestMessage;
-import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
-import org.apache.ignite.network.TestMessagesFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
/**
@@ -66,301 +25,301 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(ConfigurationExtension.class)
public class ItConnectionManagerTest {
- /** Started connection managers. */
- private final List<ConnectionManager> startedManagers = new ArrayList<>();
-
- /** Started bootstrap factories. */
- private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
-
- /** Message factory. */
- private final TestMessagesFactory messageFactory = new TestMessagesFactory();
-
- /** Reusable network configuration object. */
- @InjectConfiguration
- private NetworkConfiguration networkConfiguration;
-
- /**
- * After each.
- */
- @AfterEach
- final void tearDown() throws Exception {
- startedManagers.forEach(ConnectionManager::stop);
-
- for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
- startedBootstrapFactory.stop();
- }
- }
-
- /**
- * Tests that a message is sent successfully using the ConnectionManager.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testSentSuccessfully() throws Exception {
- String msgText = "test";
-
- int port1 = 4000;
- int port2 = 4001;
-
- ConnectionManager manager1 = startManager(port1).get1();
- ConnectionManager manager2 = startManager(port2).get1();
-
- var fut = new CompletableFuture<NetworkMessage>();
-
- manager2.addListener((consistentId, message) -> fut.complete(message));
-
- NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
-
- TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
-
- sender.send(testMessage).get(3, TimeUnit.SECONDS);
-
- NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
-
- assertEquals(msgText, ((TestMessage) receivedMessage).msg());
- }
-
- /**
- * Tests that incoming connection is reused for sending messages.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testReuseIncomingConnection() throws Exception {
- final String msgText = "test";
-
- TestMessage testMessage = messageFactory.testMessage().msg("test").build();
-
- int port1 = 4000;
- int port2 = 4001;
-
- ConnectionManager manager1 = startManager(port1).get1();
- ConnectionManager manager2 = startManager(port2).get1();
-
- var fut = new CompletableFuture<NetworkMessage>();
-
- manager1.addListener((consistentId, message) -> fut.complete(message));
-
- NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
-
- // Ensure a handshake has finished on both sides by sending a message.
- // TODO: IGNITE-14085 When the recovery protocol is implemented replace this with simple
- // CompletableFuture#get called on the send future.
- var messageReceivedOn2 = new CompletableFuture<Void>();
-
- // If the message is received, that means that the handshake was successfully performed.
- manager2.addListener((consistentId, message) -> messageReceivedOn2.complete(null));
-
- senderFrom1to2.send(testMessage);
-
- messageReceivedOn2.get(3, TimeUnit.SECONDS);
-
- NettySender senderFrom2to1 = manager2.channel(manager1.consistentId(), new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
-
- InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
-
- InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
-
- assertEquals(clientLocalAddress, clientRemoteAddress);
-
- senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
-
- NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
-
- assertEquals(msgText, ((TestMessage) receivedMessage).msg());
- }
-
- /**
- * Tests that the resources of a connection manager are closed after a shutdown.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testShutdown() throws Exception {
- int port1 = 4000;
- int port2 = 4001;
-
- ConnectionManager manager1 = startManager(port1).get1();
- ConnectionManager manager2 = startManager(port2).get1();
-
- NettySender sender1 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
- NettySender sender2 = manager2.channel(null, new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
-
- assertNotNull(sender1);
- assertNotNull(sender2);
-
- Stream.of(manager1, manager2).forEach(manager -> {
- NettyServer server = manager.server();
- Collection<NettyClient> clients = manager.clients();
-
- manager.stop();
-
- assertFalse(server.isRunning());
-
- boolean clientsStopped = clients.stream().allMatch(NettyClient::isDisconnected);
-
- assertTrue(clientsStopped);
- });
- }
-
- /**
- * Tests that after a channel was closed, a new channel is opened upon a request.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testCanReconnectAfterFail() throws Exception {
- String msgText = "test";
-
- int port1 = 4000;
- int port2 = 4001;
-
- ConnectionManager manager1 = startManager(port1).get1();
-
- IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> manager2 = startManager(port2);
-
- NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
-
- TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
-
- manager2.get1().stop();
- manager2.get2().stop();
-
- final NettySender finalSender = sender;
-
- assertThrows(ClosedChannelException.class, () -> {
- try {
- finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
- } catch (Exception e) {
- throw e.getCause();
- }
- });
-
- manager2 = startManager(port2);
-
- var fut = new CompletableFuture<NetworkMessage>();
-
- manager2.get1().addListener((consistentId, message) -> fut.complete(message));
-
- sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
-
- sender.send(testMessage).get(3, TimeUnit.SECONDS);
-
- NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
-
- assertEquals(msgText, ((TestMessage) receivedMessage).msg());
- }
-
- /**
- * Tests that a connection to a misconfigured server results in a connection close and an exception on the client side.
- */
- @Test
- public void testConnectMisconfiguredServer() throws Exception {
- ConnectionManager client = startManager(4000).get1();
-
- ConnectionManager server = startManager(4001, mockSerializationRegistry()).get1();
-
- try {
- client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
- } catch (ExecutionException e) {
- assertThat(e.getCause(), isA(IOException.class));
- }
- }
-
- /**
- * Tests that a connection from a misconfigured client results in an exception.
- */
- @Test
- public void testConnectMisconfiguredClient() throws Exception {
- ConnectionManager client = startManager(4000, mockSerializationRegistry()).get1();
-
- ConnectionManager server = startManager(4001).get1();
-
- try {
- client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
- } catch (ExecutionException e) {
- assertThat(e.getCause(), isA(DecoderException.class));
- }
- }
-
- /**
- * Tests that a connection manager fails to start twice.
- */
- @Test
- public void testStartTwice() {
- ConnectionManager server = startManager(4000).get1();
-
- assertThrows(IgniteInternalException.class, server::start);
- }
-
- /**
- * Tests that a connection manager can be stopped twice.
- */
- @Test
- public void testStopTwice() {
- ConnectionManager server = startManager(4000).get1();
-
- server.stop();
- server.stop();
- }
-
- /**
- * Creates a mock {@link MessageSerializationRegistry} that throws an exception when trying to get a serializer or a deserializer.
- */
- private static MessageSerializationRegistry mockSerializationRegistry() {
- var mockRegistry = mock(MessageSerializationRegistry.class);
-
- when(mockRegistry.createDeserializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
- when(mockRegistry.createSerializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
-
- return mockRegistry;
- }
-
- /**
- * Creates and starts a {@link ConnectionManager} listening on the given port.
- *
- * @param port Port for the connection manager to listen on.
- * @return Connection manager.
- */
- private IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> startManager(int port) {
- return startManager(port, new TestMessageSerializationRegistryImpl());
- }
-
- /**
- * Creates and starts a {@link ConnectionManager} listening on the given port, configured with the provided serialization registry.
- *
- * @param port Port for the connection manager to listen on.
- * @param registry Serialization registry.
- * @return Connection manager.
- */
- private IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> startManager(int port, MessageSerializationRegistry registry) {
- UUID launchId = UUID.randomUUID();
- String consistentId = UUID.randomUUID().toString();
-
- var messageFactory = new NetworkMessagesFactory();
-
- networkConfiguration.port().update(port).join();
-
- NetworkView cfg = networkConfiguration.value();
-
- NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
- bootstrapFactory.start();
- startedBootstrapFactories.add(bootstrapFactory);
-
- var manager = new ConnectionManager(
- cfg,
- new SerializationService(registry, mock(UserObjectSerializationContext.class)),
- consistentId,
- () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
- () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
- bootstrapFactory
- );
-
- manager.start();
-
- startedManagers.add(manager);
-
- return new IgniteBiTuple<>(manager, bootstrapFactory);
- }
+// /** Started connection managers. */
+// private final List<ConnectionManager> startedManagers = new ArrayList<>();
+//
+// /** Started bootstrap factories. */
+// private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
+//
+// /** Message factory. */
+// private final TestMessagesFactory messageFactory = new TestMessagesFactory();
+//
+// /** Reusable network configuration object. */
+// @InjectConfiguration
+// private NetworkConfiguration networkConfiguration;
+//
+// /**
+// * After each.
+// */
+// @AfterEach
+// final void tearDown() throws Exception {
+// startedManagers.forEach(ConnectionManager::stop);
+//
+// for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
+// startedBootstrapFactory.stop();
+// }
+// }
+//
+// /**
+// * Tests that a message is sent successfully using the ConnectionManager.
+// *
+// * @throws Exception If failed.
+// */
+// @Test
+// public void testSentSuccessfully() throws Exception {
+// String msgText = "test";
+//
+// int port1 = 4000;
+// int port2 = 4001;
+//
+// ConnectionManager manager1 = startManager(port1).get1();
+// ConnectionManager manager2 = startManager(port2).get1();
+//
+// var fut = new CompletableFuture<NetworkMessage>();
+//
+// manager2.addListener((consistentId, message) -> fut.complete(message));
+//
+// NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+//
+// TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
+//
+// sender.send(testMessage).get(3, TimeUnit.SECONDS);
+//
+// NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+//
+// assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+// }
+//
+// /**
+// * Tests that incoming connection is reused for sending messages.
+// *
+// * @throws Exception If failed.
+// */
+// @Test
+// public void testReuseIncomingConnection() throws Exception {
+// final String msgText = "test";
+//
+// TestMessage testMessage = messageFactory.testMessage().msg("test").build();
+//
+// int port1 = 4000;
+// int port2 = 4001;
+//
+// ConnectionManager manager1 = startManager(port1).get1();
+// ConnectionManager manager2 = startManager(port2).get1();
+//
+// var fut = new CompletableFuture<NetworkMessage>();
+//
+// manager1.addListener((consistentId, message) -> fut.complete(message));
+//
+// NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+//
+// // Ensure a handshake has finished on both sides by sending a message.
+// // TODO: IGNITE-14085 When the recovery protocol is implemented replace this with simple
+// // CompletableFuture#get called on the send future.
+// var messageReceivedOn2 = new CompletableFuture<Void>();
+//
+// // If the message is received, that means that the handshake was successfully performed.
+// manager2.addListener((consistentId, message) -> messageReceivedOn2.complete(null));
+//
+// senderFrom1to2.send(testMessage);
+//
+// messageReceivedOn2.get(3, TimeUnit.SECONDS);
+//
+// NettySender senderFrom2to1 = manager2.channel(manager1.consistentId(), new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
+//
+// InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
+//
+// InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
+//
+// assertEquals(clientLocalAddress, clientRemoteAddress);
+//
+// senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
+//
+// NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+//
+// assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+// }
+//
+// /**
+// * Tests that the resources of a connection manager are closed after a shutdown.
+// *
+// * @throws Exception If failed.
+// */
+// @Test
+// public void testShutdown() throws Exception {
+// int port1 = 4000;
+// int port2 = 4001;
+//
+// ConnectionManager manager1 = startManager(port1).get1();
+// ConnectionManager manager2 = startManager(port2).get1();
+//
+// NettySender sender1 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+// NettySender sender2 = manager2.channel(null, new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
+//
+// assertNotNull(sender1);
+// assertNotNull(sender2);
+//
+// Stream.of(manager1, manager2).forEach(manager -> {
+// NettyServer server = manager.server();
+// Collection<NettyClient> clients = manager.clients();
+//
+// manager.stop();
+//
+// assertFalse(server.isRunning());
+//
+// boolean clientsStopped = clients.stream().allMatch(NettyClient::isDisconnected);
+//
+// assertTrue(clientsStopped);
+// });
+// }
+//
+// /**
+// * Tests that after a channel was closed, a new channel is opened upon a request.
+// *
+// * @throws Exception If failed.
+// */
+// @Test
+// public void testCanReconnectAfterFail() throws Exception {
+// String msgText = "test";
+//
+// int port1 = 4000;
+// int port2 = 4001;
+//
+// ConnectionManager manager1 = startManager(port1).get1();
+//
+// IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> manager2 = startManager(port2);
+//
+// NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+//
+// TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
+//
+// manager2.get1().stop();
+// manager2.get2().stop();
+//
+// final NettySender finalSender = sender;
+//
+// assertThrows(ClosedChannelException.class, () -> {
+// try {
+// finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
+// } catch (Exception e) {
+// throw e.getCause();
+// }
+// });
+//
+// manager2 = startManager(port2);
+//
+// var fut = new CompletableFuture<NetworkMessage>();
+//
+// manager2.get1().addListener((consistentId, message) -> fut.complete(message));
+//
+// sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+//
+// sender.send(testMessage).get(3, TimeUnit.SECONDS);
+//
+// NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+//
+// assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+// }
+//
+// /**
+// * Tests that a connection to a misconfigured server results in a connection close and an exception on the client side.
+// */
+// @Test
+// public void testConnectMisconfiguredServer() throws Exception {
+// ConnectionManager client = startManager(4000).get1();
+//
+// ConnectionManager server = startManager(4001, mockSerializationRegistry()).get1();
+//
+// try {
+// client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
+// } catch (ExecutionException e) {
+// assertThat(e.getCause(), isA(IOException.class));
+// }
+// }
+//
+// /**
+// * Tests that a connection from a misconfigured client results in an exception.
+// */
+// @Test
+// public void testConnectMisconfiguredClient() throws Exception {
+// ConnectionManager client = startManager(4000, mockSerializationRegistry()).get1();
+//
+// ConnectionManager server = startManager(4001).get1();
+//
+// try {
+// client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
+// } catch (ExecutionException e) {
+// assertThat(e.getCause(), isA(DecoderException.class));
+// }
+// }
+//
+// /**
+// * Tests that a connection manager fails to start twice.
+// */
+// @Test
+// public void testStartTwice() {
+// ConnectionManager server = startManager(4000).get1();
+//
+// assertThrows(IgniteInternalException.class, server::start);
+// }
+//
+// /**
+// * Tests that a connection manager can be stopped twice.
+// */
+// @Test
+// public void testStopTwice() {
+// ConnectionManager server = startManager(4000).get1();
+//
+// server.stop();
+// server.stop();
+// }
+//
+// /**
+// * Creates a mock {@link MessageSerializationRegistry} that throws an exception when trying to get a serializer or a deserializer.
+// */
+// private static MessageSerializationRegistry mockSerializationRegistry() {
+// var mockRegistry = mock(MessageSerializationRegistry.class);
+//
+// when(mockRegistry.createDeserializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
+// when(mockRegistry.createSerializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
+//
+// return mockRegistry;
+// }
+//
+// /**
+// * Creates and starts a {@link ConnectionManager} listening on the given port.
+// *
+// * @param port Port for the connection manager to listen on.
+// * @return Connection manager.
+// */
+// private IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> startManager(int port) {
+// return startManager(port, new TestMessageSerializationRegistryImpl());
+// }
+//
+// /**
+// * Creates and starts a {@link ConnectionManager} listening on the given port, configured with the provided serialization registry.
+// *
+// * @param port Port for the connection manager to listen on.
+// * @param registry Serialization registry.
+// * @return Connection manager.
+// */
+// private IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> startManager(int port, MessageSerializationRegistry registry) {
+// UUID launchId = UUID.randomUUID();
+// String consistentId = UUID.randomUUID().toString();
+//
+// var messageFactory = new NetworkMessagesFactory();
+//
+// networkConfiguration.port().update(port).join();
+//
+// NetworkView cfg = networkConfiguration.value();
+//
+// NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
+// bootstrapFactory.start();
+// startedBootstrapFactories.add(bootstrapFactory);
+//
+// var manager = new ConnectionManager(
+// cfg,
+// new SerializationService(registry, mock(UserObjectSerializationContext.class)),
+// consistentId,
+// () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
+// () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
+// bootstrapFactory
+// );
+//
+// manager.start();
+//
+// startedManagers.add(manager);
+//
+// return new IgniteBiTuple<>(manager, bootstrapFactory);
+// }
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
index 6ad6231..dce03dd 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
@@ -17,458 +17,417 @@
package org.apache.ignite.internal.network.recovery;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import io.netty.channel.Channel;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
-import org.apache.ignite.configuration.schemas.network.NetworkView;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
-import org.apache.ignite.internal.network.netty.ConnectionManager;
-import org.apache.ignite.internal.network.netty.NettySender;
-import org.apache.ignite.internal.network.serialization.SerializationService;
-import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
-import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
-import org.apache.ignite.network.TestMessagesFactory;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
/**
* Recovery protocol handshake tests.
*/
@ExtendWith(ConfigurationExtension.class)
public class ItRecoveryHandshakeTest {
- /** Started connection managers. */
- private final List<ConnectionManager> startedManagers = new ArrayList<>();
-
- /** Started bootstrap factories. */
- private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
-
- private final TestMessagesFactory messageFactory = new TestMessagesFactory();
-
- /** Reusable network configuration object. */
- @InjectConfiguration
- private NetworkConfiguration networkConfiguration;
-
- /**
- * After each.
- */
- @AfterEach
- final void tearDown() throws Exception {
- startedManagers.forEach(ConnectionManager::stop);
-
- for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
- startedBootstrapFactory.stop();
- }
- }
-
- /**
- * Tests handshake scenarios in which some of the parts of handshake protocol can fail.
- *
- * @param scenario Handshake scenario.
- * @throws Exception If failed.
- */
- @ParameterizedTest
- @MethodSource("handshakeScenarios")
- public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
- ConnectionManager manager1 = startManager(
- 4000,
- scenario.serverFailAt,
- CLIENT_DOESNT_FAIL
- );
-
- ConnectionManager manager2 = startManager(
- 4001,
- SERVER_DOESNT_FAIL,
- scenario.clientFailAt
- );
-
- NettySender from2to1;
-
- try {
- from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
- } catch (Exception e) {
- if (scenario.clientFailAt == CLIENT_DOESNT_FAIL && scenario.serverFailAt == SERVER_DOESNT_FAIL) {
- Assertions.fail(e);
- }
-
- return;
- }
-
- if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL) {
- Assertions.fail("Handshake should've failed");
- }
-
- assertNotNull(from2to1);
-
- // Ensure the handshake has finished on both sides.
- from2to1.send(messageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
-
- NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
-
- assertNotNull(from1to2);
-
- assertEquals(from2to1.channel().localAddress(), from1to2.channel().remoteAddress());
- }
-
- /**
- * Tests special handshake scenario: the client assumes a handshake has been finished, but the server fails on client's response. The
- * server will then close a connection and the client should get the "connection closed event".
- *
- * @throws Exception If failed.
- */
- @Test
- public void testHandshakeFailsOnServerWhenClientResponded() throws Exception {
- ConnectionManager manager1 = startManager(
- 4000,
- SERVER_CLIENT_RESPONDED,
- CLIENT_DOESNT_FAIL
- );
-
- ConnectionManager manager2 = startManager(
- 4001,
- SERVER_DOESNT_FAIL,
- CLIENT_DOESNT_FAIL
- );
-
- NettySender from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
-
- from2to1.channel().closeFuture().get(3, TimeUnit.SECONDS);
- }
-
- /**
- * Tests that one server can handle multiple incoming connections and perform the handshake operation successfully.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testHandshakeWithMultipleClients() throws Exception {
- ConnectionManager server = startManager(4000);
-
- List<ConnectionManager> clients = new ArrayList<>();
-
- int clientCount = 10;
-
- for (int i = 0; i < clientCount; i++) {
- clients.add(startManager(4001 + i));
- }
-
- // A key is the client's consistent id, a value is the channel between the client and the server
- var channelsToServer = new HashMap<String, NettySender>();
-
- for (ConnectionManager client : clients) {
- channelsToServer.put(
- client.consistentId(),
- client.channel(server.consistentId(), server.getLocalAddress()).get(3, TimeUnit.SECONDS)
- );
- }
-
- assertTrue(waitForCondition(() -> server.channels().size() == clientCount, TimeUnit.SECONDS.toMillis(3)));
-
- Map<String, NettySender> channels = server.channels();
-
- // Assert that server's channels are inbound connections opened from clients
- channelsToServer.forEach((consistentId, toServer) -> {
- NettySender toClient = channels.get(consistentId);
-
- assertNotNull(toClient);
-
- assertEquals(toServer.channel().localAddress(), toClient.channel().remoteAddress());
- });
- }
-
- /**
- * Returns generates handshake scenarios.
- *
- * @return Generates handshake scenarios.
- */
- private static List<HandshakeScenario> handshakeScenarios() {
- ServerStageFail[] serverOpts = ServerStageFail.values();
-
- ClientStageFail[] clientOpts = ClientStageFail.values();
-
- List<HandshakeScenario> res = new ArrayList<>();
-
- for (ServerStageFail serverOpt : serverOpts) {
- for (ClientStageFail clientOpt : clientOpts) {
- // The case in if statement is handled in separate test
- if (serverOpt != SERVER_CLIENT_RESPONDED && clientOpt != CLIENT_DOESNT_FAIL) {
- res.add(new HandshakeScenario(serverOpt, clientOpt));
- }
- }
- }
-
- return res;
- }
-
- /** Handshake scenario. */
- private static class HandshakeScenario {
- /** Stage to fail server handshake at. */
- private final ServerStageFail serverFailAt;
-
- /** Stage to fail client handshake at. */
- private final ClientStageFail clientFailAt;
-
- /** Constructor. */
- private HandshakeScenario(ServerStageFail serverFailAt, ClientStageFail clientFailAt) {
- this.serverFailAt = serverFailAt;
- this.clientFailAt = clientFailAt;
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return String.format("server=%s, client=%s", serverFailAt, clientFailAt);
- }
- }
-
- /**
- * {@link RecoveryServerHandshakeManager} that can fail at specific stage of the handshake.
- */
- static class FailingRecoveryServerHandshakeManager extends RecoveryServerHandshakeManager {
- /**
- * At what stage to fail the handshake.
- */
- private final ServerStageFail failAtStage;
-
- /** Constructor. */
- private FailingRecoveryServerHandshakeManager(
- UUID launchId,
- String consistentId,
- ServerStageFail failAtStage,
- NetworkMessagesFactory messageFactory
- ) {
- super(launchId, consistentId, messageFactory);
- this.failAtStage = failAtStage;
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult init(Channel channel) {
- if (failAtStage == SERVER_INIT) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.init(channel);
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onConnectionOpen(Channel channel) {
- if (failAtStage == SERVER_CONNECTION_OPENED) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.onConnectionOpen(channel);
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
- if (failAtStage == SERVER_CLIENT_RESPONDED) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.onMessage(channel, message);
- }
-
- /** Server handshake stage to fail at. */
- enum ServerStageFail {
- /** Don't fail at all. */
- SERVER_DOESNT_FAIL,
-
- /** Fail on init. */
- SERVER_INIT,
-
- /** Fail on connection open. */
- SERVER_CONNECTION_OPENED,
-
- /** Fail on client response. */
- SERVER_CLIENT_RESPONDED
- }
- }
-
- /**
- * {@link RecoveryClientHandshakeManager} that can fail at specific stage of the handshake.
- */
- static class FailingRecoveryClientHandshakeManager extends RecoveryClientHandshakeManager {
- /**
- * At what stage to fail the handshake.
- */
- private final ClientStageFail failAtStage;
-
- /** Constructor. */
- private FailingRecoveryClientHandshakeManager(
- UUID launchId,
- String consistentId,
- ClientStageFail failAtStage,
- NetworkMessagesFactory messageFactory
- ) {
- super(launchId, consistentId, messageFactory);
- this.failAtStage = failAtStage;
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult init(Channel channel) {
- if (failAtStage == CLIENT_INIT) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.init(channel);
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onConnectionOpen(Channel channel) {
- if (failAtStage == CLIENT_CONNECTION_OPENED) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.onConnectionOpen(channel);
- }
-
- /** {@inheritDoc} */
- @Override
- public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
- if (failAtStage == CLIENT_SERVER_RESPONDED) {
- handshakeFuture().completeExceptionally(new RuntimeException());
- return HandshakeResult.fail();
- }
-
- return super.onMessage(channel, message);
- }
-
- /** Client handshake stage to fail at. */
- enum ClientStageFail {
- /** Don't fail at all. */
- CLIENT_DOESNT_FAIL,
-
- /** Fail on init. */
- CLIENT_INIT,
-
- /** Fail on connection open. */
- CLIENT_CONNECTION_OPENED,
-
- /** Fail on server response. */
- CLIENT_SERVER_RESPONDED
- }
- }
-
- /**
- * Starts a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
- *
- * @param port Port for the server.
- * @param serverHandshakeFailAt At what stage to fail server handshake.
- * @param clientHandshakeFailAt At what stage to fail client handshake.
- * @return Connection manager.
- */
- private ConnectionManager startManager(
- int port,
- ServerStageFail serverHandshakeFailAt,
- ClientStageFail clientHandshakeFailAt
- ) {
- var registry = new TestMessageSerializationRegistryImpl();
- var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
-
- var messageFactory = new NetworkMessagesFactory();
-
- UUID launchId = UUID.randomUUID();
- String consistentId = UUID.randomUUID().toString();
-
- networkConfiguration.port().update(port).join();
-
- NetworkView cfg = networkConfiguration.value();
-
- NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
- bootstrapFactory.start();
- startedBootstrapFactories.add(bootstrapFactory);
-
- var manager = new ConnectionManager(
- cfg,
- serializationService,
- consistentId,
- () -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt, messageFactory),
- () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory),
- bootstrapFactory
- );
-
- manager.start();
-
- startedManagers.add(manager);
-
- return manager;
- }
-
- /**
- * Starts a {@link ConnectionManager} with a normal handshake manager adding it to the {@link #startedManagers} list.
- *
- * @param port Port.
- * @return Connection manager
- */
- private ConnectionManager startManager(int port) {
- var registry = new TestMessageSerializationRegistryImpl();
- var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
-
- var messageFactory = new NetworkMessagesFactory();
-
- UUID launchId = UUID.randomUUID();
- String consistentId = UUID.randomUUID().toString();
-
- networkConfiguration.port().update(port).join();
-
- NetworkView cfg = networkConfiguration.value();
-
- NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
- bootstrapFactory.start();
- startedBootstrapFactories.add(bootstrapFactory);
-
- var manager = new ConnectionManager(
- cfg,
- serializationService,
- consistentId,
- () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
- () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
- bootstrapFactory
- );
-
- manager.start();
-
- startedManagers.add(manager);
-
- return manager;
- }
+// /** Started connection managers. */
+// private final List<ConnectionManager> startedManagers = new ArrayList<>();
+//
+// /** Started bootstrap factories. */
+// private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
+//
+// private final TestMessagesFactory messageFactory = new TestMessagesFactory();
+//
+// /** Reusable network configuration object. */
+// @InjectConfiguration
+// private NetworkConfiguration networkConfiguration;
+//
+// /**
+// * After each.
+// */
+// @AfterEach
+// final void tearDown() throws Exception {
+// startedManagers.forEach(ConnectionManager::stop);
+//
+// for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
+// startedBootstrapFactory.stop();
+// }
+// }
+//
+// /**
+// * Tests handshake scenarios in which some of the parts of handshake protocol can fail.
+// *
+// * @param scenario Handshake scenario.
+// * @throws Exception If failed.
+// */
+// @ParameterizedTest
+// @MethodSource("handshakeScenarios")
+// public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
+// ConnectionManager manager1 = startManager(
+// 4000,
+// scenario.serverFailAt,
+// CLIENT_DOESNT_FAIL
+// );
+//
+// ConnectionManager manager2 = startManager(
+// 4001,
+// SERVER_DOESNT_FAIL,
+// scenario.clientFailAt
+// );
+//
+// NettySender from2to1;
+//
+// try {
+// from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+// } catch (Exception e) {
+// if (scenario.clientFailAt == CLIENT_DOESNT_FAIL && scenario.serverFailAt == SERVER_DOESNT_FAIL) {
+// Assertions.fail(e);
+// }
+//
+// return;
+// }
+//
+// if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL) {
+// Assertions.fail("Handshake should've failed");
+// }
+//
+// assertNotNull(from2to1);
+//
+// // Ensure the handshake has finished on both sides.
+// from2to1.send(messageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+//
+// NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
+//
+// assertNotNull(from1to2);
+//
+// assertEquals(from2to1.channel().localAddress(), from1to2.channel().remoteAddress());
+// }
+//
+// /**
+// * Tests special handshake scenario: the client assumes a handshake has been finished, but the server fails on client's response. The
+// * server will then close a connection and the client should get the "connection closed event".
+// *
+// * @throws Exception If failed.
+// */
+// @Test
+// public void testHandshakeFailsOnServerWhenClientResponded() throws Exception {
+// ConnectionManager manager1 = startManager(
+// 4000,
+// SERVER_CLIENT_RESPONDED,
+// CLIENT_DOESNT_FAIL
+// );
+//
+// ConnectionManager manager2 = startManager(
+// 4001,
+// SERVER_DOESNT_FAIL,
+// CLIENT_DOESNT_FAIL
+// );
+//
+// NettySender from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+//
+// from2to1.channel().closeFuture().get(3, TimeUnit.SECONDS);
+// }
+//
+// /**
+// * Tests that one server can handle multiple incoming connections and perform the handshake operation successfully.
+// *
+// * @throws Exception If failed.
+// */
+// @Test
+// public void testHandshakeWithMultipleClients() throws Exception {
+// ConnectionManager server = startManager(4000);
+//
+// List<ConnectionManager> clients = new ArrayList<>();
+//
+// int clientCount = 10;
+//
+// for (int i = 0; i < clientCount; i++) {
+// clients.add(startManager(4001 + i));
+// }
+//
+// // A key is the client's consistent id, a value is the channel between the client and the server
+// var channelsToServer = new HashMap<String, NettySender>();
+//
+// for (ConnectionManager client : clients) {
+// channelsToServer.put(
+// client.consistentId(),
+// client.channel(server.consistentId(), server.getLocalAddress()).get(3, TimeUnit.SECONDS)
+// );
+// }
+//
+// assertTrue(waitForCondition(() -> server.channels().size() == clientCount, TimeUnit.SECONDS.toMillis(3)));
+//
+// Map<String, NettySender> channels = server.channels();
+//
+// // Assert that server's channels are inbound connections opened from clients
+// channelsToServer.forEach((consistentId, toServer) -> {
+// NettySender toClient = channels.get(consistentId);
+//
+// assertNotNull(toClient);
+//
+// assertEquals(toServer.channel().localAddress(), toClient.channel().remoteAddress());
+// });
+// }
+//
+// /**
+// * Returns generates handshake scenarios.
+// *
+// * @return Generates handshake scenarios.
+// */
+// private static List<HandshakeScenario> handshakeScenarios() {
+// ServerStageFail[] serverOpts = ServerStageFail.values();
+//
+// ClientStageFail[] clientOpts = ClientStageFail.values();
+//
+// List<HandshakeScenario> res = new ArrayList<>();
+//
+// for (ServerStageFail serverOpt : serverOpts) {
+// for (ClientStageFail clientOpt : clientOpts) {
+// // The case in if statement is handled in separate test
+// if (serverOpt != SERVER_CLIENT_RESPONDED && clientOpt != CLIENT_DOESNT_FAIL) {
+// res.add(new HandshakeScenario(serverOpt, clientOpt));
+// }
+// }
+// }
+//
+// return res;
+// }
+//
+// /** Handshake scenario. */
+// private static class HandshakeScenario {
+// /** Stage to fail server handshake at. */
+// private final ServerStageFail serverFailAt;
+//
+// /** Stage to fail client handshake at. */
+// private final ClientStageFail clientFailAt;
+//
+// /** Constructor. */
+// private HandshakeScenario(ServerStageFail serverFailAt, ClientStageFail clientFailAt) {
+// this.serverFailAt = serverFailAt;
+// this.clientFailAt = clientFailAt;
+// }
+//
+// /** {@inheritDoc} */
+// @Override
+// public String toString() {
+// return String.format("server=%s, client=%s", serverFailAt, clientFailAt);
+// }
+// }
+//
+// /**
+// * {@link RecoveryServerHandshakeManager} that can fail at specific stage of the handshake.
+// */
+// static class FailingRecoveryServerHandshakeManager extends RecoveryServerHandshakeManager {
+// /**
+// * At what stage to fail the handshake.
+// */
+// private final ServerStageFail failAtStage;
+//
+// /** Constructor. */
+// private FailingRecoveryServerHandshakeManager(
+// UUID launchId,
+// String consistentId,
+// ServerStageFail failAtStage,
+// NetworkMessagesFactory messageFactory
+// ) {
+// super(launchId, consistentId, messageFactory);
+// this.failAtStage = failAtStage;
+// }
+//
+// /** {@inheritDoc} */
+// @Override
+// public HandshakeResult init(Channel channel) {
+// if (failAtStage == SERVER_INIT) {
+// handshakeFuture().completeExceptionally(new RuntimeException());
+// return HandshakeResult.fail();
+// }
+//
+// return super.init(channel);
+// }
+//
+// /** {@inheritDoc} */
+// @Override
+// public HandshakeResult onConnectionOpen(Channel channel) {
+// if (failAtStage == SERVER_CONNECTION_OPENED) {
+// handshakeFuture().completeExceptionally(new RuntimeException());
+// return HandshakeResult.fail();
+// }
+//
+// return super.onConnectionOpen(channel);
+// }
+//
+// /** {@inheritDoc} */
+// @Override
+// public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+// if (failAtStage == SERVER_CLIENT_RESPONDED) {
+// handshakeFuture().completeExceptionally(new RuntimeException());
+// return HandshakeResult.fail();
+// }
+//
+// return super.onMessage(channel, message);
+// }
+//
+// /** Server handshake stage to fail at. */
+// enum ServerStageFail {
+// /** Don't fail at all. */
+// SERVER_DOESNT_FAIL,
+//
+// /** Fail on init. */
+// SERVER_INIT,
+//
+// /** Fail on connection open. */
+// SERVER_CONNECTION_OPENED,
+//
+// /** Fail on client response. */
+// SERVER_CLIENT_RESPONDED
+// }
+// }
+//
+// /**
+// * {@link RecoveryClientHandshakeManager} that can fail at specific stage of the handshake.
+// */
+// static class FailingRecoveryClientHandshakeManager extends RecoveryClientHandshakeManager {
+// /**
+// * At what stage to fail the handshake.
+// */
+// private final ClientStageFail failAtStage;
+//
+// /** Constructor. */
+// private FailingRecoveryClientHandshakeManager(
+// UUID launchId,
+// String consistentId,
+// ClientStageFail failAtStage,
+// NetworkMessagesFactory messageFactory
+// ) {
+// super(launchId, consistentId, messageFactory);
+// this.failAtStage = failAtStage;
+// }
+//
+// /** {@inheritDoc} */
+// @Override
+// public HandshakeResult init(Channel channel) {
+// if (failAtStage == CLIENT_INIT) {
+// handshakeFuture().completeExceptionally(new RuntimeException());
+// return HandshakeResult.fail();
+// }
+//
+// return super.init(channel);
+// }
+//
+// /** {@inheritDoc} */
+// @Override
+// public HandshakeResult onConnectionOpen(Channel channel) {
+// if (failAtStage == CLIENT_CONNECTION_OPENED) {
+// handshakeFuture().completeExceptionally(new RuntimeException());
+// return HandshakeResult.fail();
+// }
+//
+// return super.onConnectionOpen(channel);
+// }
+//
+// /** {@inheritDoc} */
+// @Override
+// public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+// if (failAtStage == CLIENT_SERVER_RESPONDED) {
+// handshakeFuture().completeExceptionally(new RuntimeException());
+// return HandshakeResult.fail();
+// }
+//
+// return super.onMessage(channel, message);
+// }
+//
+// /** Client handshake stage to fail at. */
+// enum ClientStageFail {
+// /** Don't fail at all. */
+// CLIENT_DOESNT_FAIL,
+//
+// /** Fail on init. */
+// CLIENT_INIT,
+//
+// /** Fail on connection open. */
+// CLIENT_CONNECTION_OPENED,
+//
+// /** Fail on server response. */
+// CLIENT_SERVER_RESPONDED
+// }
+// }
+//
+// /**
+// * Starts a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
+// *
+// * @param port Port for the server.
+// * @param serverHandshakeFailAt At what stage to fail server handshake.
+// * @param clientHandshakeFailAt At what stage to fail client handshake.
+// * @return Connection manager.
+// */
+// private ConnectionManager startManager(
+// int port,
+// ServerStageFail serverHandshakeFailAt,
+// ClientStageFail clientHandshakeFailAt
+// ) {
+// var registry = new TestMessageSerializationRegistryImpl();
+// var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
+//
+// var messageFactory = new NetworkMessagesFactory();
+//
+// UUID launchId = UUID.randomUUID();
+// String consistentId = UUID.randomUUID().toString();
+//
+// networkConfiguration.port().update(port).join();
+//
+// NetworkView cfg = networkConfiguration.value();
+//
+// NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
+// bootstrapFactory.start();
+// startedBootstrapFactories.add(bootstrapFactory);
+//
+// var manager = new ConnectionManager(
+// cfg,
+// serializationService,
+// consistentId,
+// () -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt, messageFactory),
+// () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory),
+// bootstrapFactory
+// );
+//
+// manager.start();
+//
+// startedManagers.add(manager);
+//
+// return manager;
+// }
+//
+// /**
+// * Starts a {@link ConnectionManager} with a normal handshake manager adding it to the {@link #startedManagers} list.
+// *
+// * @param port Port.
+// * @return Connection manager
+// */
+// private ConnectionManager startManager(int port) {
+// var registry = new TestMessageSerializationRegistryImpl();
+// var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
+//
+// var messageFactory = new NetworkMessagesFactory();
+//
+// UUID launchId = UUID.randomUUID();
+// String consistentId = UUID.randomUUID().toString();
+//
+// networkConfiguration.port().update(port).join();
+//
+// NetworkView cfg = networkConfiguration.value();
+//
+// NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
+// bootstrapFactory.start();
+// startedBootstrapFactories.add(bootstrapFactory);
+//
+// var manager = new ConnectionManager(
+// cfg,
+// serializationService,
+// consistentId,
+// () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
+// () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
+// bootstrapFactory
+// );
+//
+// manager.start();
+//
+// startedManagers.add(manager);
+//
+// return manager;
+// }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/Communication.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/Communication.java
new file mode 100644
index 0000000..0421efa
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/Communication.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.NetworkMessage;
+
+public class Communication {
+ private final CommunicationPool pool;
+
+ private final NettySender sender;
+
+ public Communication(CommunicationPool pool, NettySender sender) {
+ this.pool = pool;
+ this.sender = sender;
+ }
+
+ public CompletableFuture<Void> sendAndRelease(NetworkMessage msg) {
+ return sender.send(msg).thenAccept(unused -> release());
+ }
+
+ public CompletableFuture<Void> send(NetworkMessage msg) {
+ return sender.send(msg);
+ }
+
+ public void release() {
+ pool.release(this);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationHandler.java
new file mode 100644
index 0000000..e6e1258
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.net.SocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.jetbrains.annotations.Nullable;
+
+public class CommunicationHandler {
+
+ private final ConcurrentMap<String, CommunicationPool> connections = new ConcurrentHashMap<>();
+
+ private final ConnectionManager connectionManager;
+
+ public CommunicationHandler(ConnectionManager connectionManager) {
+ this.connectionManager = connectionManager;
+ }
+
+ public CompletableFuture<Communication> communication(@Nullable String consistentId, SocketAddress address) {
+ if (consistentId != null) {
+ CommunicationPool communicationPool = connections.computeIfAbsent(consistentId, s -> {
+ return new CommunicationPool(address, connectionManager);
+ });
+
+ return communicationPool.get();
+ }
+
+ return connectionManager.connect(address).thenApply((sender) -> afterClientConnected(address, sender));
+ }
+
+ public Communication afterClientConnected(SocketAddress address, NettySender sender) {
+ CommunicationPool pool = connections.computeIfAbsent(sender.consistentId(), s -> {
+ return new CommunicationPool(address, connectionManager);
+ });
+
+ return new Communication(pool, sender);
+ }
+
+ public boolean isStopped() {
+ return connectionManager.isStopped();
+ }
+
+ public String consistentId() {
+ return connectionManager.consistentId();
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationPool.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationPool.java
new file mode 100644
index 0000000..ce260d7
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationPool.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.net.SocketAddress;
+import java.util.Deque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+public class CommunicationPool {
+ private final Deque<Communication> queue = new ConcurrentLinkedDeque<>();
+ private final SocketAddress address;
+ private final ConnectionManager connectionManager;
+
+ public CommunicationPool(SocketAddress address, ConnectionManager connectionManager) {
+ this.address = address;
+ this.connectionManager = connectionManager;
+ }
+
+ public void release(Communication communication) {
+ queue.offer(communication);
+ }
+
+ public CompletableFuture<Communication> get() {
+ Communication communication = queue.pollFirst();
+
+ if (communication != null) {
+ return CompletableFuture.completedFuture(communication);
+ }
+
+ return connectionManager.connect(address).thenApply(sender -> {
+ return new Communication(this, sender);
+ });
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 5d40854..478ba0b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -18,18 +18,9 @@
package org.apache.ignite.internal.network.netty;
import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.SocketAddress;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -37,17 +28,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.ignite.configuration.schemas.network.NetworkView;
-import org.apache.ignite.configuration.schemas.network.OutboundView;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
-import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -66,12 +53,6 @@ public class ConnectionManager {
/** Server. */
private final NettyServer server;
- /** Channels map from consistentId to {@link NettySender}. */
- private final Map<ChannelKey, NettySender> channels = new ConcurrentHashMap<>();
-
- /** Clients. */
- private final Map<ClientKey, NettyClient> clients = new ConcurrentHashMap<>();
-
/** Serialization service. */
private final SerializationService serializationService;
@@ -163,104 +144,6 @@ public class ConnectionManager {
return server.address();
}
- public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
- return channel(consistentId, address, 0);
- }
-
- /**
- * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
- *
- * @param consistentId Another node's consistent id.
- * @param address Another node's address.
- * @return Sender.
- */
- public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address, int messageType) {
- // LOG.info("Requesting channel=" + consistentId + " address=" + address);
- if (consistentId != null) {
- // If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
- // or an inbound connection associated with that consistent id.
- var key = new ChannelKey(consistentId, messageType);
- NettySender channel = channels.compute(
- key,
- (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
- );
-
- if (channel != null) {
- return CompletableFuture.completedFuture(channel);
- }
- }
-
- // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
- // when the client is ready for write operations, so previously started client, that didn't establish connection
- // or didn't perform the handhsake operaton, can be reused.
- ClientKey clientKey = new ClientKey(address, messageType);
- NettyClient client = clients.compute(clientKey, (addr, existingClient) ->
- existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected()
- ? existingClient : connect(addr)
- );
-
- CompletableFuture<NettySender> sender = client.sender();
-
- assert sender != null;
-
- return sender;
- }
-
- private static class ClientKey {
- private final SocketAddress address;
-
- private final int msgGroup;
-
- private ClientKey(SocketAddress address, int msgGroup) {
- this.address = address;
- this.msgGroup = msgGroup;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ClientKey clientKey = (ClientKey) o;
- return msgGroup == clientKey.msgGroup && Objects.equals(address, clientKey.address);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(address, msgGroup);
- }
- }
-
- private static class ChannelKey {
- private final String consistentId;
-
- private final int msgGroup;
-
- private ChannelKey(String consistentId, int msgGroup) {
- this.consistentId = consistentId;
- this.msgGroup = msgGroup;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ChannelKey that = (ChannelKey) o;
- return msgGroup == that.msgGroup && Objects.equals(consistentId, that.consistentId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(consistentId, msgGroup);
- }
- }
private ExecutorService svc = Executors.newFixedThreadPool(20);
@@ -292,23 +175,15 @@ public class ConnectionManager {
* @param key Client key.
* @return New netty client.
*/
- private NettyClient connect(ClientKey key) {
+ public CompletableFuture<NettySender> connect(SocketAddress address) {
var client = new NettyClient(
- key.address,
+ address,
serializationService,
clientHandshakeManagerFactory.get(),
this::onMessage
);
- client.start(clientBootstrap).whenComplete((sender, throwable) -> {
- if (throwable == null) {
- channels.put(new ChannelKey(sender.consistentId(), key.msgGroup), sender);
- } else {
- clients.remove(key.address);
- }
- });
-
- return client;
+ return client.start(clientBootstrap);
}
/**
@@ -330,15 +205,8 @@ public class ConnectionManager {
return;
}
- Stream<CompletableFuture<Void>> stream = Stream.concat(
- clients.values().stream().map(NettyClient::stop),
- Stream.of(server.stop())
- );
-
- CompletableFuture<Void> stopFut = CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));
-
try {
- stopFut.join();
+ server.stop().join();
} catch (Exception e) {
LOG.warn("Failed to stop the ConnectionManager: {}", e.getMessage());
}
@@ -374,48 +242,4 @@ public class ConnectionManager {
public String consistentId() {
return consistentId;
}
-
- /**
- * Returns collection of all the clients started by this connection manager.
- *
- * @return Collection of all the clients started by this connection manager.
- */
- @TestOnly
- public Collection<NettyClient> clients() {
- return Collections.unmodifiableCollection(clients.values());
- }
-
-
- /**
- * Returns map of the channels.
- *
- * @return Map of the channels.
- */
- @TestOnly
- public Map<String, NettySender> channels() {
- return channels.entrySet().stream().collect(Collectors.toMap(k -> k.getKey().consistentId, Entry::getValue));
- }
-
- /**
- * Creates a {@link Bootstrap} for clients with channel options provided by a {@link OutboundView}.
- *
- * @param eventLoopGroup Event loop group for channel handling.
- * @param clientConfiguration Client configuration.
- * @return Bootstrap for clients.
- */
- public static Bootstrap createClientBootstrap(
- EventLoopGroup eventLoopGroup,
- OutboundView clientConfiguration
- ) {
- Bootstrap clientBootstrap = new Bootstrap();
-
- clientBootstrap.group(eventLoopGroup)
- .channel(NioSocketChannel.class)
- // See NettyServer#start for netty configuration details.
- .option(ChannelOption.SO_KEEPALIVE, clientConfiguration.soKeepAlive())
- .option(ChannelOption.SO_LINGER, clientConfiguration.soLinger())
- .option(ChannelOption.TCP_NODELAY, clientConfiguration.tcpNoDelay());
-
- return clientBootstrap;
- }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 677f365..4c0d3cf 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.InvokeRequest;
import org.apache.ignite.internal.network.message.InvokeResponse;
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.CommunicationHandler;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.lang.NodeStoppingException;
@@ -46,7 +46,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
private final TopologyService topologyService;
/** Connection manager that provides access to {@link NettySender}. */
- private volatile ConnectionManager connectionManager;
+ private volatile CommunicationHandler connectionManager;
/**
* This node's local socket address. Not volatile, because access is guarded by volatile reads/writes to the {@code connectionManager}
@@ -84,7 +84,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
*/
public void setConnectionManager(ConnectionManager connectionManager) {
this.localAddress = (InetSocketAddress) connectionManager.getLocalAddress();
- this.connectionManager = connectionManager;
+ this.connectionManager = new CommunicationHandler(connectionManager);
connectionManager.addListener(this::onMessage);
}
@@ -158,7 +158,9 @@ public class DefaultMessagingService extends AbstractMessagingService {
String recipientConsistentId = recipient != null ? recipient.name() : address.consistentId();
- return connectionManager.channel(recipientConsistentId, addr, msg.groupType()).thenCompose(sender -> sender.send(message));
+ return connectionManager.communication(recipientConsistentId, addr).thenCompose(communication -> {
+ return communication.sendAndRelease(message);
+ });
}
/**
@@ -194,7 +196,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
String recipientConsistentId = recipient != null ? recipient.name() : addr.consistentId();
- return connectionManager.channel(recipientConsistentId, address, msg.groupType()).thenCompose(sender -> sender.send(message))
+ return connectionManager.communication(recipientConsistentId, address).thenCompose(sender -> sender.sendAndRelease(message))
.thenCompose(unused -> responseFuture);
}
@@ -219,11 +221,6 @@ public class DefaultMessagingService extends AbstractMessagingService {
* @param msg Incoming message.
*/
private void onMessage(String consistentId, NetworkMessage msg) {
- if (msg instanceof ScaleCubeMessage) {
- // ScaleCube messages are handled in the ScaleCubeTransport
- return;
- }
-
if (msg instanceof InvokeResponse) {
InvokeResponse response = (InvokeResponse) msg;
onInvokeResponse(response.message(), response.correlationId());
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 668a57e..07a6e13 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -104,31 +104,32 @@ public class ScaleCubeClusterServiceFactory {
nettyBootstrapFactory
);
- var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr, topologyService, messageFactory);
+ connectionMgr.start();
+
+ // resolve cyclic dependencies
+ messagingService.setConnectionManager(connectionMgr);
+
+ var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr.getLocalAddress(), messagingService, topologyService, messageFactory);
NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
cluster = new ClusterImpl(clusterConfig(configView.membership()))
- .handler(cl -> new ClusterMessageHandler() {
- /** {@inheritDoc} */
- @Override
- public void onMembershipEvent(MembershipEvent event) {
- topologyService.onMembershipEvent(event);
- }
- })
- .config(opts -> opts.memberAlias(consistentId))
- .transport(opts -> opts.transportFactory(transportConfig -> transport))
- .membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));
+ .handler(cl -> new ClusterMessageHandler() {
+ /** {@inheritDoc} */
+ @Override
+ public void onMembershipEvent(MembershipEvent event) {
+ topologyService.onMembershipEvent(event);
+ }
+ })
+ .config(opts -> opts.memberAlias(consistentId))
+ .transport(opts -> opts.transportFactory(transportConfig -> transport))
+ .membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));
shutdownFuture = cluster.onShutdown().toFuture();
- connectionMgr.start();
+ cluster.startAwait();
- // resolve cyclic dependencies
topologyService.setCluster(cluster);
- messagingService.setConnectionManager(connectionMgr);
-
- cluster.startAwait();
// emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
var localMembershipEvent = MembershipEvent.createAdded(cluster.member(), null, System.currentTimeMillis());
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ad763de..8d26bd9 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Objects;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
@@ -62,15 +64,14 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
/** On stop. */
private final MonoProcessor<Void> onStop = MonoProcessor.create();
- /** Connection manager. */
- private final ConnectionManager connectionManager;
-
/** Message factory. */
private final NetworkMessagesFactory messageFactory;
/** Topology service. */
private final ScaleCubeTopologyService topologyService;
+ private final MessagingService messagingService;
+
/** Node address. */
private Address address;
@@ -82,21 +83,26 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
* @param messageFactory message factory
*/
ScaleCubeDirectMarshallerTransport(
- ConnectionManager connectionManager,
+ SocketAddress address,
+ MessagingService messagingService,
ScaleCubeTopologyService topologyService,
NetworkMessagesFactory messageFactory
) {
- this.connectionManager = connectionManager;
+ this.address = prepareAddress(address);
+ this.messagingService = messagingService;
this.topologyService = topologyService;
this.messageFactory = messageFactory;
- this.connectionManager.addListener(this::onMessage);
+ this.messagingService.addMessageHandler(NetworkMessageTypes.class, (message, senderAddr, correlationId) -> {
+ this.onMessage(message);
+ });
+
// Setup cleanup
stop.then(doStop())
.doFinally(s -> onStop.onComplete())
.subscribe(
null,
- ex -> LOG.warn("Failed to stop {}: {}", address, ex.toString())
+ ex -> LOG.warn("Failed to stop {}: {}", this.address, ex.toString())
);
}
@@ -145,8 +151,6 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
/** {@inheritDoc} */
@Override
public Mono<Transport> start() {
- address = prepareAddress(connectionManager.getLocalAddress());
-
return Mono.just(this);
}
@@ -171,21 +175,24 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
var addr = InetSocketAddress.createUnresolved(address.host(), address.port());
return Mono.fromFuture(() -> {
- ClusterNode node = topologyService.getByAddress(NetworkAddress.from(addr));
+ NetworkAddress networkAddress = NetworkAddress.from(addr);
+
+ ClusterNode node = topologyService.getByAddress(networkAddress);
- String consistentId = node != null ? node.name() : null;
+ if (node == null) {
+ node = new ClusterNode(null, null, networkAddress);
+ }
- return connectionManager.channel(consistentId, addr).thenCompose(client -> client.send(fromMessage(message)));
+ return messagingService.send(node, fromMessage(message));
});
}
/**
* Handles new network messages from {@link #connectionManager}.
*
- * @param senderConsistentId Sender's consistent id.
* @param msg Network message.
*/
- private void onMessage(String senderConsistentId, NetworkMessage msg) {
+ private void onMessage(NetworkMessage msg) {
Message message = fromNetworkMessage(msg);
if (message != null) {