You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/05/28 09:26:37 UTC
[ignite-3] branch main updated: IGNITE-14082 Implemented handshake
protocol in new netty-based networking implementation. (#135)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e1d4caf IGNITE-14082 Implemented handshake protocol in new netty-based networking implementation. (#135)
e1d4caf is described below
commit e1d4caf48ecca2b8fbb71d29e67716500ac4f910
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri May 28 12:26:30 2021 +0300
IGNITE-14082 Implemented handshake protocol in new netty-based networking implementation. (#135)
---
.../processor/internal/Processor.java | 2 +-
.../client/ITMetaStorageServiceTest.java | 10 +-
.../processor/ITAutoSerializableProcessorTest.java | 3 +-
.../internal/AutoSerializableProcessor.java | 7 +-
.../ignite/network/AbstractClusterService.java | 8 +-
.../ignite/network/ClusterLocalConfiguration.java | 8 +-
.../org/apache/ignite/network/ClusterNode.java | 6 +
.../org/apache/ignite/network/ClusterService.java | 6 +-
.../ignite/network/ClusterServiceFactory.java | 7 +-
.../apache/ignite/network/MessagingService.java | 4 +
.../ignite/network/TopologyEventHandler.java | 4 +
.../org/apache/ignite/network/TopologyService.java | 2 +
.../network/serialization/MessageReader.java | 4 +
.../MessageSerializationRegistry.java | 26 +-
.../network/serialization/MessageWriter.java | 4 +
.../org/apache/ignite/network/TestMessage.java | 2 +-
.../netty}/ConnectionManagerTest.java | 86 ++++-
.../internal/recovery/RecoveryHandshakeTest.java | 349 +++++++++++++++++++++
.../network/scalecube/ITNodeRestartsTest.java | 8 +-
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 6 +
.../direct/stream/DirectByteBufferStream.java | 8 +
.../stream/DirectByteBufferStreamImplV1.java | 2 +
.../internal/handshake/HandshakeAction.java} | 23 +-
.../internal/handshake/HandshakeException.java} | 25 +-
.../internal/handshake/HandshakeManager.java | 59 ++++
.../network/internal/netty/ConnectionManager.java | 122 ++++---
.../network/internal/netty/HandshakeHandler.java | 98 ++++++
.../network/internal/netty/InboundDecoder.java | 12 +
.../ignite/network/internal/netty/NettyClient.java | 68 ++--
.../ignite/network/internal/netty/NettySender.java | 130 ++------
.../ignite/network/internal/netty/NettyServer.java | 41 ++-
.../ignite/network/internal/netty/NettyUtils.java | 1 +
.../{NettySender.java => OutboundEncoder.java} | 49 +--
.../recovery/RecoveryClientHandshakeManager.java | 105 +++++++
.../recovery/RecoveryServerHandshakeManager.java | 103 ++++++
.../recovery/message/HandshakeMessageFactory.java} | 18 +-
.../recovery/message/HandshakeStartMessage.java} | 38 ++-
.../message/HandshakeStartMessageImpl.java} | 41 ++-
.../message/HandshakeStartResponseMessage.java | 60 ++++
.../message/HandshakeStartResponseMessageImpl.java | 83 +++++
.../scalecube/ScaleCubeClusterServiceFactory.java | 19 +-
.../ScaleCubeDirectMarshallerTransport.java | 15 +-
.../scalecube/ScaleCubeMessagingService.java | 19 +-
.../scalecube/ScaleCubeTopologyService.java | 4 +
.../network/internal/netty/InboundDecoderTest.java | 4 +-
.../network/internal/netty/NettyClientTest.java | 74 ++++-
.../network/internal/netty/NettyServerTest.java | 118 ++++++-
.../raft/server/ITRaftCounterServerTest.java | 8 +-
.../apache/ignite/internal/app/IgnitionImpl.java | 2 +-
.../ignite/distributed/ITDistributedTableTest.java | 8 +-
50 files changed, 1592 insertions(+), 317 deletions(-)
diff --git a/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/configuration/processor/internal/Processor.java b/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/configuration/processor/internal/Processor.java
index c4bdbb7..217b3b9 100644
--- a/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/configuration/processor/internal/Processor.java
+++ b/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/configuration/processor/internal/Processor.java
@@ -885,6 +885,6 @@ public class Processor extends AbstractProcessor {
/** {@inheritDoc} */
@Override public SourceVersion getSupportedSourceVersion() {
- return SourceVersion.RELEASE_11;
+ return SourceVersion.latest();
}
}
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index 34df12a..36c04c0 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -50,6 +50,10 @@ import org.apache.ignite.metastorage.client.WatchListener;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
@@ -104,10 +108,10 @@ public class ITMetaStorageServiceTest {
private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
/** */
- // TODO: IGNITE-14088 Uncomment and use real serializer provider
- // private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry()
- .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory());
+ .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+ .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+ .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory());
/** Expected server result entry. */
private static final org.apache.ignite.internal.metastorage.server.Entry EXPECTED_SRV_RESULT_ENTRY =
diff --git a/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/network/messages/internal/processor/ITAutoSerializableProcessorTest.java b/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/network/messages/internal/processor/ITAutoSerializableProcessorTest.java
index 2d6e9eb..2b1b340 100644
--- a/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/network/messages/internal/processor/ITAutoSerializableProcessorTest.java
+++ b/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/network/messages/internal/processor/ITAutoSerializableProcessorTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.network.processor.internal;
+package org.apache.ignite.network.messages.internal.processor;
import java.util.Arrays;
import java.util.List;
@@ -25,6 +25,7 @@ import com.google.testing.compile.Compilation;
import com.google.testing.compile.Compiler;
import com.google.testing.compile.JavaFileObjects;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.processor.internal.AutoSerializableProcessor;
import org.junit.jupiter.api.Test;
import static com.google.testing.compile.CompilationSubject.assertThat;
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/network/processor/internal/AutoSerializableProcessor.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/network/processor/internal/AutoSerializableProcessor.java
index bb053ed..202c5a3 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/network/processor/internal/AutoSerializableProcessor.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/network/processor/internal/AutoSerializableProcessor.java
@@ -29,7 +29,6 @@ import java.util.StringJoiner;
import java.util.stream.Collectors;
import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.RoundEnvironment;
-import javax.annotation.processing.SupportedSourceVersion;
import javax.lang.model.SourceVersion;
import javax.lang.model.element.Element;
import javax.lang.model.element.ElementKind;
@@ -54,7 +53,6 @@ import org.apache.ignite.network.serialization.MessageSerializer;
* Annotation processor for generating (de-)serializers for network messages marked with the {@link AutoSerializable}
* annotation.
*/
-@SupportedSourceVersion(SourceVersion.RELEASE_11)
public class AutoSerializableProcessor extends AbstractProcessor {
/** {@inheritDoc} */
@Override public Set<String> getSupportedAnnotationTypes() {
@@ -245,4 +243,9 @@ public class AutoSerializableProcessor extends AbstractProcessor {
return false;
}
+
+ /** {@inheritDoc} */
+ @Override public SourceVersion getSupportedSourceVersion() {
+ return SourceVersion.latest();
+ }
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/AbstractClusterService.java b/modules/network-api/src/main/java/org/apache/ignite/network/AbstractClusterService.java
index ec551d3..8185ab4 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/AbstractClusterService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/AbstractClusterService.java
@@ -33,7 +33,13 @@ public abstract class AbstractClusterService implements ClusterService {
/** Messaging service. */
private final MessagingService messagingService;
- /** */
+ /**
+ * Constructor.
+ *
+ * @param context Cluster context.
+ * @param topologyService Topology service.
+ * @param messagingService Messaging service.
+ */
public AbstractClusterService(
ClusterLocalConfiguration context,
TopologyService topologyService,
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
index 700faa9..e3d1a4d 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
@@ -55,28 +55,28 @@ public class ClusterLocalConfiguration {
}
/**
- * Network alias of a node.
+ * @return Network alias of a node.
*/
public String getName() {
return name;
}
/**
- * Port.
+ * @return Port.
*/
public int getPort() {
return port;
}
/**
- * Addresses of other nodes.
+ * @return Addresses of other nodes.
*/
public List<String> getMemberAddresses() {
return memberAddresses;
}
/**
- * Message mapper providers.
+ * @return Message serialization registry.
*/
public MessageSerializationRegistry getSerializationRegistry() {
return serializationRegistry;
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java
index 2573a08..a37d723 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -40,7 +40,10 @@ public class ClusterNode implements Serializable {
private String address;
/**
+ * @param id Local id that changes between restarts.
* @param name Unique name of member in cluster.
+ * @param host Node host.
+ * @param port Node port.
*/
public ClusterNode(String id, String name, String host, int port) {
this.id = id;
@@ -49,6 +52,9 @@ public class ClusterNode implements Serializable {
this.port = port;
}
+ /**
+ * @return Node's local id.
+ */
public String id() {
return id;
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterService.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterService.java
index d218bf7..bc5db45 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterService.java
@@ -23,17 +23,17 @@ package org.apache.ignite.network;
*/
public interface ClusterService {
/**
- * Returns the {@link TopologyService} for working with the cluster topology.
+ * @return {@link TopologyService} for working with the cluster topology.
*/
TopologyService topologyService();
/**
- * Returns the {@link TopologyService} for sending messages to the cluster members.
+ * @return {@link TopologyService} for sending messages to the cluster members.
*/
MessagingService messagingService();
/**
- * Returns the context associated with the current node.
+ * @return Context associated with the current node.
*/
ClusterLocalConfiguration localConfiguration();
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
index b28c42d..c149208 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
@@ -16,10 +16,15 @@
*/
package org.apache.ignite.network;
-/** */
+/**
+ * Cluster service factory.
+ */
public interface ClusterServiceFactory {
/**
* Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
+ *
+ * @param context Cluster context.
+ * @return New cluster service.
*/
ClusterService createClusterService(ClusterLocalConfiguration context);
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index 7b55c4d..1968aba 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -43,6 +43,7 @@ public interface MessagingService {
*
* @param recipient Recipient of the message.
* @param msg Message which should be delivered.
+ * @return Future of the send operation.
*/
CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg);
@@ -52,6 +53,7 @@ public interface MessagingService {
* @param recipient Recipient of the message.
* @param msg Message which should be delivered.
* @param correlationId Correlation id when replying to the request.
+ * @return Future of the send operation.
*/
CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg, String correlationId);
@@ -68,6 +70,8 @@ public interface MessagingService {
/**
* Registers a handler for network message events.
+ *
+ * @param handler Message handler.
*/
void addMessageHandler(NetworkMessageHandler handler);
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
index 3c4cb36..67c1065 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
@@ -22,12 +22,16 @@ package org.apache.ignite.network;
public interface TopologyEventHandler {
/**
* Called when a new member has been detected joining a cluster.
+ *
+ * @param member Appeared cluster member.
*/
void onAppeared(ClusterNode member);
/**
* Indicates that a member has left a cluster. This method is only called when a member leaves permanently (i.e.
* it is not possible to re-establish a connection to it).
+ *
+ * @param member Disappeared cluster member.
*/
void onDisappeared(ClusterNode member);
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
index e343bc0..b7fac7d 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -38,6 +38,8 @@ public interface TopologyService {
/**
* Registers a handler for topology change events.
+ *
+ * @param handler Topology events handler.
*/
void addEventHandler(TopologyEventHandler handler);
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
index a313c2f..07c71cd 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
@@ -232,6 +232,7 @@ public interface MessageReader {
/**
* Reads a nested message.
*
+ * @param <T> Type of a message;
* @param name Field name.
* @return Message.
*/
@@ -240,6 +241,7 @@ public interface MessageReader {
/**
* Reads an array of objects.
*
+ * @param <T> Type of an array.
* @param name Field name.
* @param itemType A component type of the array.
* @param itemCls A component class of the array.
@@ -250,6 +252,7 @@ public interface MessageReader {
/**
* Reads a collection.
*
+ * @param <C> Type of a collection.
* @param name Field name.
* @param itemType An item type of the Collection.
* @return Collection.
@@ -259,6 +262,7 @@ public interface MessageReader {
/**
* Reads a map.
*
+ * @param <M> Type of a map.
* @param name Field name.
* @param keyType The type of the map's key.
* @param valType The type of the map's value.
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageSerializationRegistry.java b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageSerializationRegistry.java
index 0c8d559..f9fa06e 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageSerializationRegistry.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageSerializationRegistry.java
@@ -23,16 +23,22 @@ import org.apache.ignite.network.NetworkMessage;
/**
* Container that maps message types to {@link MessageSerializationFactory} instances.
*/
-public final class MessageSerializationRegistry {
+public class MessageSerializationRegistry {
/** message type -> MessageSerializerProvider instance */
private final MessageSerializationFactory<?>[] factories = new MessageSerializationFactory<?>[Short.MAX_VALUE << 1];
/**
* Registers message serialization factory by message type.
+ *
+ * @param type Direct type of a message.
+ * @param factory Message's serialization factory.
+ * @return This registry.
+ * @throws NetworkConfigurationException If there is an already registered factory for the given type.
*/
- public MessageSerializationRegistry registerFactory(short type, MessageSerializationFactory<?> factory) {
+ public MessageSerializationRegistry registerFactory(short type,
+ MessageSerializationFactory<?> factory) {
if (this.factories[type] != null)
- throw new NetworkConfigurationException("Message mapper for type " + type + " is already defined");
+ throw new NetworkConfigurationException("Message serialization factory for direct type " + type + " is already defined");
this.factories[type] = factory;
@@ -40,7 +46,11 @@ public final class MessageSerializationRegistry {
}
/**
- * Returns a {@link MessageSerializationFactory} for the given message type.
+ * Gets a {@link MessageSerializationFactory} for the given message type.
+ *
+ * @param <T> Type of a message.
+ * @param type Direct type of a message.
+ * @return Message's serialization factory.
*/
private <T extends NetworkMessage> MessageSerializationFactory<T> getFactory(short type) {
var provider = factories[type];
@@ -56,6 +66,10 @@ public final class MessageSerializationRegistry {
* {@link MessageSerializationRegistry} does not track the correspondence between the message type and its Java
* representation, so the actual generic specialization of the returned provider relies on the caller of this
* method.
+ *
+ * @param <T> Type of a message.
+ * @param type Direct type of a message.
+ * @return Message's serializer.
*/
public <T extends NetworkMessage> MessageSerializer<T> createSerializer(short type) {
MessageSerializationFactory<T> factory = getFactory(type);
@@ -68,6 +82,10 @@ public final class MessageSerializationRegistry {
* {@link MessageSerializationRegistry} does not track the correspondence between the message type and its Java
* representation, so the actual generic specialization of the returned provider relies on the caller of this
* method.
+ *
+ * @param <T> Type of a message.
+ * @param type Direct type of a message.
+ * @return Message's deserializer.
*/
public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short type) {
MessageSerializationFactory<T> factory = getFactory(type);
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
index 4f976de..1da4989 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
@@ -266,6 +266,7 @@ public interface MessageWriter {
/**
* Writes an array of objects.
*
+ * @param <T> Type of an array.
* @param name Field name.
* @param arr Array of objects.
* @param itemType A component type of the array.
@@ -276,6 +277,7 @@ public interface MessageWriter {
/**
* Writes collection.
*
+ * @param <T> Type of a collection.
* @param name Field name.
* @param col Collection.
* @param itemType An item type of the collection.
@@ -286,6 +288,8 @@ public interface MessageWriter {
/**
* Writes a map.
*
+ * @param <K> Type of the map's keys.
+ * @param <V> Type of the map's values.
* @param name Field name.
* @param map Map.
* @param keyType The type of the map's key.
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
index dc6de9b..0ed4133 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
@@ -23,7 +23,7 @@ import org.apache.ignite.network.processor.annotations.AutoSerializable;
@AutoSerializable(messageFactory = TestMessageFactory.class)
public interface TestMessage extends NetworkMessage {
/** Visible type for tests. */
- public static final short TYPE = 3;
+ public static final short TYPE = 4;
String msg();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
similarity index 58%
rename from modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
rename to modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
index def8e56..4862080 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
@@ -15,21 +15,27 @@
* limitations under the License.
*/
-package org.apache.ignite.network;
+package org.apache.ignite.network.internal.netty;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
-import org.apache.ignite.network.internal.netty.ConnectionManager;
-import org.apache.ignite.network.internal.netty.NettyClient;
-import org.apache.ignite.network.internal.netty.NettySender;
-import org.apache.ignite.network.internal.netty.NettyServer;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageFactory;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.network.internal.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -72,11 +78,52 @@ public class ConnectionManagerTest {
manager2.addListener((address, message) -> fut.complete(message));
- NettySender sender = manager1.channel(new InetSocketAddress(port2)).get();
+ NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
TestMessage testMessage = TestMessageFactory.testMessage().msg(msgText).build();
- sender.send(testMessage).join();
+ sender.send(testMessage).get(3, TimeUnit.SECONDS);
+
+ NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+
+ assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+ }
+
+ /**
+ * Tests that incoming connection is reused for sending messages.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReuseIncomingConnection() throws Exception {
+ String msgText = "test";
+
+ int port1 = 4000;
+ int port2 = 4001;
+
+ ConnectionManager manager1 = startManager(port1);
+ ConnectionManager manager2 = startManager(port2);
+
+ var fut = new CompletableFuture<NetworkMessage>();
+
+ manager1.addListener((address, message) -> fut.complete(message));
+
+ NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+
+ // Ensure a handshake has finished on both sides.
+ senderFrom1to2.send(TestMessageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+
+ NettySender senderFrom2to1 = manager2.channel(manager1.consistentId(), new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
+
+ InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
+
+ InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
+
+ assertEquals(clientLocalAddress, clientRemoteAddress);
+
+ TestMessage testMessage = TestMessageFactory.testMessage().msg("test").build();
+
+ senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
@@ -96,8 +143,8 @@ public class ConnectionManagerTest {
ConnectionManager manager1 = startManager(port1);
ConnectionManager manager2 = startManager(port2);
- NettySender sender1 = manager1.channel(new InetSocketAddress(port2)).get();
- NettySender sender2 = manager2.channel(new InetSocketAddress(port1)).get();
+ NettySender sender1 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+ NettySender sender2 = manager2.channel(null, new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
assertNotNull(sender1);
assertNotNull(sender2);
@@ -131,7 +178,7 @@ public class ConnectionManagerTest {
ConnectionManager manager1 = startManager(port1);
ConnectionManager manager2 = startManager(port2);
- NettySender sender = manager1.channel(new InetSocketAddress(port2)).get();
+ NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
TestMessage testMessage = TestMessageFactory.testMessage().msg(msgText).build();
@@ -141,7 +188,7 @@ public class ConnectionManagerTest {
assertThrows(ClosedChannelException.class, () -> {
try {
- finalSender.send(testMessage).join();
+ finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
}
catch (Exception e) {
throw e.getCause();
@@ -154,9 +201,9 @@ public class ConnectionManagerTest {
manager2.addListener((address, message) -> fut.complete(message));
- sender = manager1.channel(new InetSocketAddress(port2)).get();
+ sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
- sender.send(testMessage).join();
+ sender.send(testMessage).get(3, TimeUnit.SECONDS);
NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
@@ -171,9 +218,20 @@ public class ConnectionManagerTest {
*/
private ConnectionManager startManager(int port) {
var registry = new MessageSerializationRegistry()
+ .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+ .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory())
.registerFactory(TestMessage.TYPE, new TestMessageSerializationFactory());
- var manager = new ConnectionManager(port, registry);
+ UUID launchId = UUID.randomUUID();
+ String consistentId = UUID.randomUUID().toString();
+
+ var manager = new ConnectionManager(
+ port,
+ registry,
+ consistentId,
+ () -> new RecoveryServerHandshakeManager(launchId, consistentId),
+ () -> new RecoveryClientHandshakeManager(launchId, consistentId)
+ );
manager.start();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
new file mode 100644
index 0000000..e717951
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import io.netty.channel.Channel;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageFactory;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Recovery protocol handshake tests.
+ */
+public class RecoveryHandshakeTest {
+ /** Started connection managers. */
+ private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+ /** */
+ @AfterEach
+ final void tearDown() {
+ startedManagers.forEach(ConnectionManager::stop);
+ }
+
+ /**
+ * Tests handshake scenarios in which some of the parts of handshake protocol can fail.
+ *
+ * @param scenario Handshake scenario.
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @MethodSource("handshakeScenarios")
+ public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
+ ConnectionManager manager1 = startManager(
+ 4000,
+ scenario.serverFailAt,
+ CLIENT_DOESNT_FAIL
+ );
+
+ ConnectionManager manager2 = startManager(
+ 4001,
+ SERVER_DOESNT_FAIL,
+ scenario.clientFailAt
+ );
+
+ NettySender from2to1;
+
+ try {
+ from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ if (scenario.clientFailAt == CLIENT_DOESNT_FAIL &&
+ scenario.serverFailAt == SERVER_DOESNT_FAIL)
+ Assertions.fail(e);
+
+ return;
+ }
+
+ if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL)
+ Assertions.fail("Handshake should've failed");
+
+ assertNotNull(from2to1);
+
+ // Ensure the handshake has finished on both sides.
+ from2to1.send(TestMessageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+
+ NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
+
+ assertNotNull(from1to2);
+
+ assertEquals(from2to1.channel().localAddress(), from1to2.channel().remoteAddress());
+ }
+
+ /**
+ * Tests special handshake scenario: the client assumes a handshake has been finished, but the server fails
+ * on client's response. The server will then close a connection and the client should get the
+ * "connection closed event".
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testHandshakeFailsOnServerWhenClientResponded() throws Exception {
+ ConnectionManager manager1 = startManager(
+ 4000,
+ SERVER_CLIENT_RESPONDED,
+ CLIENT_DOESNT_FAIL
+ );
+
+ ConnectionManager manager2 = startManager(
+ 4001,
+ SERVER_DOESNT_FAIL,
+ CLIENT_DOESNT_FAIL
+ );
+
+ NettySender from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+
+ from2to1.channel().closeFuture().get(3, TimeUnit.SECONDS);
+ }
+
+ /**
+ * @return Generates handshake scenarios.
+ */
+ private static List<HandshakeScenario> handshakeScenarios() {
+ ServerStageFail[] serverOpts = ServerStageFail.values();
+
+ ClientStageFail[] clientOpts = ClientStageFail.values();
+
+ List<HandshakeScenario> res = new ArrayList<>();
+
+ for (ServerStageFail serverOpt : serverOpts)
+ for (ClientStageFail clientOpt : clientOpts)
+ // The case in if statement is handled in separate test
+ if (serverOpt != SERVER_CLIENT_RESPONDED && clientOpt != CLIENT_DOESNT_FAIL)
+ res.add(new HandshakeScenario(serverOpt, clientOpt));
+
+ return res;
+ }
+
+ /** Handshake scenario. */
+ private static class HandshakeScenario {
+ /** Stage to fail server handshake at. */
+ private final ServerStageFail serverFailAt;
+
+ /** Stage to fail client handshake at. */
+ private final ClientStageFail clientFailAt;
+
+ /** Constructor. */
+ private HandshakeScenario(ServerStageFail serverFailAt, ClientStageFail clientFailAt) {
+ this.serverFailAt = serverFailAt;
+ this.clientFailAt = clientFailAt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return String.format("server=%s, client=%s", serverFailAt, clientFailAt);
+ }
+ }
+
+ /**
+ * {@link RecoveryServerHandshakeManager} that can fail at specific stage of the handshake.
+ */
+ static class FailingRecoveryServerHandshakeManager extends RecoveryServerHandshakeManager {
+ /**
+ * At what stage to fail the handshake.
+ */
+ private final ServerStageFail failAtStage;
+
+ /** Constructor. */
+ private FailingRecoveryServerHandshakeManager(
+ UUID launchId,
+ String consistentId,
+ ServerStageFail failAtStage
+ ) {
+ super(launchId, consistentId);
+ this.failAtStage = failAtStage;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction init(Channel channel) {
+ if (failAtStage == SERVER_INIT) {
+ handshakeFuture().completeExceptionally(new RuntimeException());
+ return HandshakeAction.FAIL;
+ }
+
+ return super.init(channel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onConnectionOpen(Channel channel) {
+ if (failAtStage == SERVER_CONNECTION_OPENED) {
+ handshakeFuture().completeExceptionally(new RuntimeException());
+ return HandshakeAction.FAIL;
+ }
+
+ return super.onConnectionOpen(channel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+ if (failAtStage == SERVER_CLIENT_RESPONDED) {
+ handshakeFuture().completeExceptionally(new RuntimeException());
+ return HandshakeAction.FAIL;
+ }
+
+ return super.onMessage(channel, message);
+ }
+
+ /** Server handshake stage to fail at. */
+ enum ServerStageFail {
+ /** Don't fail at all. */
+ SERVER_DOESNT_FAIL,
+
+ /** Fail on init. */
+ SERVER_INIT,
+
+ /** Fail on connection open */
+ SERVER_CONNECTION_OPENED,
+
+ /** Fail on client response. */
+ SERVER_CLIENT_RESPONDED
+ }
+ }
+
+ /**
+ * {@link RecoveryClientHandshakeManager} that can fail at specific stage of the handshake.
+ */
+ static class FailingRecoveryClientHandshakeManager extends RecoveryClientHandshakeManager {
+ /**
+ * At what stage to fail the handshake.
+ */
+ private final ClientStageFail failAtStage;
+
+ /** Constructor. */
+ private FailingRecoveryClientHandshakeManager(
+ UUID launchId,
+ String consistentId,
+ ClientStageFail failAtStage
+ ) {
+ super(launchId, consistentId);
+ this.failAtStage = failAtStage;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction init(Channel channel) {
+ if (failAtStage == CLIENT_INIT) {
+ handshakeFuture().completeExceptionally(new RuntimeException());
+ return HandshakeAction.FAIL;
+ }
+
+ return super.init(channel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onConnectionOpen(Channel channel) {
+ if (failAtStage == CLIENT_CONNECTION_OPENED) {
+ handshakeFuture().completeExceptionally(new RuntimeException());
+ return HandshakeAction.FAIL;
+ }
+
+ return super.onConnectionOpen(channel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+ if (failAtStage == CLIENT_SERVER_RESPONDED) {
+ handshakeFuture().completeExceptionally(new RuntimeException());
+ return HandshakeAction.FAIL;
+ }
+
+ return super.onMessage(channel, message);
+ }
+
+ /** Client handshake stage to fail at. */
+ enum ClientStageFail {
+ /** Don't fail at all. */
+ CLIENT_DOESNT_FAIL,
+
+ /** Fail on init. */
+ CLIENT_INIT,
+
+ /** Fail on connection open. */
+ CLIENT_CONNECTION_OPENED,
+
+ /** Fail on server response. */
+ CLIENT_SERVER_RESPONDED
+ }
+ }
+
+ /**
+ * Create and start a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
+ *
+ * @param port Port for the {@link ConnectionManager#server}.
+ * @param serverHandshakeFailAt At what stage to fail server handshake.
+ * @param clientHandshakeFailAt At what stage to fail client handshake.
+ * @return Connection manager.
+ */
+ private ConnectionManager startManager(
+ int port,
+ ServerStageFail serverHandshakeFailAt,
+ ClientStageFail clientHandshakeFailAt
+ ) {
+ var registry = new MessageSerializationRegistry()
+ .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+ .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory())
+ .registerFactory(TestMessage.TYPE, new TestMessageSerializationFactory());
+
+ UUID launchId = UUID.randomUUID();
+ String consistentId = UUID.randomUUID().toString();
+
+ var manager = new ConnectionManager(
+ port,
+ registry,
+ consistentId,
+ () -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt),
+ () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt)
+ );
+
+ manager.start();
+
+ startedManagers.add(manager);
+
+ return manager;
+ }
+
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
index e7fd486..efd4b79 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
@@ -23,6 +23,10 @@ import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
@@ -41,7 +45,9 @@ class ITNodeRestartsTest {
/** */
private final MessageSerializationRegistry serializationRegistry = new MessageSerializationRegistry()
- .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory());
+ .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+ .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+ .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory());
/** */
private final ClusterServiceFactory networkFactory = new TestScaleCubeClusterServiceFactory();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 6a519c2..b24be1b 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -38,6 +38,10 @@ import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageFactory;
import org.apache.ignite.network.TestMessageSerializationFactory;
import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
@@ -266,6 +270,8 @@ class ITScaleCubeNetworkMessagingTest {
/** */
private final MessageSerializationRegistry serializationRegistry = new MessageSerializationRegistry()
.registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+ .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+ .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory())
.registerFactory(TestMessage.TYPE, new TestMessageSerializationFactory());
/** */
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java
index 1986e4b..4dd0f36 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java
@@ -167,6 +167,7 @@ public interface DirectByteBufferStream {
public void writeMessage(NetworkMessage msg, MessageWriter writer);
/**
+ * @param <T> Type of the array.
* @param arr Array.
* @param itemType Component type.
* @param writer Writer.
@@ -174,6 +175,7 @@ public interface DirectByteBufferStream {
public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer);
/**
+ * @param <T> Type of the collection.
* @param col Collection.
* @param itemType Component type.
* @param writer Writer.
@@ -181,6 +183,8 @@ public interface DirectByteBufferStream {
public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, MessageWriter writer);
/**
+ * @param <K> Type of the map's keys.
+ * @param <V> Type of the map's values.
* @param map Map.
* @param keyType Key type.
* @param valType Value type.
@@ -290,12 +294,14 @@ public interface DirectByteBufferStream {
public IgniteUuid readIgniteUuid();
/**
+ * @param <T> Type of a message.
* @param reader Reader.
* @return Message.
*/
public <T extends NetworkMessage> T readMessage(MessageReader reader);
/**
+ * @param <T> Type of an array.
* @param itemType Item type.
* @param itemCls Item class.
* @param reader Reader.
@@ -304,6 +310,7 @@ public interface DirectByteBufferStream {
public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, MessageReader reader);
/**
+ * @param <C> Type of a collection.
* @param itemType Item type.
* @param reader Reader.
* @return Collection.
@@ -311,6 +318,7 @@ public interface DirectByteBufferStream {
public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader);
/**
+ * @param <M> Type of a map.
* @param keyType Key type.
* @param valType Value type.
* @param linked Whether linked map should be created.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
index 29a5cb3..eaaf834 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
@@ -1321,6 +1321,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
}
/**
+ * @param <T> Type of an array.
* @param creator Array creator.
* @param lenShift Array length shift size.
* @param off Base offset.
@@ -1383,6 +1384,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
}
/**
+ * @param <T> Type of an array.
* @param creator Array creator.
* @param typeSize Primitive type size in bytes.
* @param lenShift Array length shift size.
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
similarity index 64%
copy from modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
index b28c42d..4900223 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
@@ -14,12 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.network;
-/** */
-public interface ClusterServiceFactory {
- /**
- * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
- */
- ClusterService createClusterService(ClusterLocalConfiguration context);
+package org.apache.ignite.network.internal.handshake;
+
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+
+/**
+ * Enum representing actions that are propagated from a {@link HandshakeManager} to a {@link HandshakeHandler}.
+ */
+public enum HandshakeAction {
+ /** Fail the handshake operation and close the channel. */
+ FAIL,
+
+ /** Remove the handshake handler. */
+ REMOVE_HANDLER,
+
+ /** Do nothing. */
+ NOOP
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeException.java
similarity index 62%
copy from modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeException.java
index 3c4cb36..f5fdc59 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeException.java
@@ -14,20 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.network;
+
+package org.apache.ignite.network.internal.handshake;
/**
- * Interface for handling events related to topology changes.
+ * Handshake exception.
*/
-public interface TopologyEventHandler {
+public class HandshakeException extends Exception {
/**
- * Called when a new member has been detected joining a cluster.
+ * Constructor.
+ *
+ * @param message Handshake error message.
*/
- void onAppeared(ClusterNode member);
+ public HandshakeException(String message) {
+ super(message);
+ }
/**
- * Indicates that a member has left a cluster. This method is only called when a member leaves permanently (i.e.
- * it is not possible to re-establish a connection to it).
+ * Constructor.
+ *
+ * @param message Handshake error message.
+ * @param cause Handshake error cause.
*/
- void onDisappeared(ClusterNode member);
+ public HandshakeException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java
new file mode 100644
index 0000000..aaacc9d
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import java.util.concurrent.CompletableFuture;
+import io.netty.channel.Channel;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+import org.apache.ignite.network.internal.netty.NettySender;
+
+/**
+ * Handshake operation manager.
+ */
+public interface HandshakeManager {
+ /**
+ * Initialize handshake manager with the channel.
+ *
+ * @param channel Channel.
+ * @return Action to perform by {@link HandshakeHandler}.
+ */
+ HandshakeAction init(Channel channel);
+
+ /**
+ * Handle an event of the connection opening.
+ *
+ * @param channel Channel.
+ * @return Action to perform by {@link HandshakeHandler}.
+ */
+ HandshakeAction onConnectionOpen(Channel channel);
+
+ /**
+ * Handle an incoming message.
+ *
+ * @param channel Channel.
+ * @param message Message to handle.
+ * @return Action to perform by {@link HandshakeHandler}.
+ */
+ HandshakeAction onMessage(Channel channel, NetworkMessage message);
+
+ /**
+ * @return Future that represents the handshake operation.
+ */
+ CompletableFuture<NettySender> handshakeFuture();
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
index 3b3f94c..ae5ae16 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
@@ -28,19 +28,19 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import java.util.function.Supplier;
import java.util.stream.Stream;
import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -62,8 +62,8 @@ public class ConnectionManager {
/** Server. */
private final NettyServer server;
- /** Channels. */
- private final Map<SocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+ /** Channels map from consistentId to {@link NettySender}. */
+ private final Map<String, NettySender> channels = new ConcurrentHashMap<>();
/** Clients. */
private final Map<SocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
@@ -74,16 +74,39 @@ public class ConnectionManager {
/** Message listeners. */
private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>();
+ /** Node consistent id. */
+ private final String consistentId;
+
+ /** Client handshake manager factory. */
+ private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
+
/**
* Constructor.
*
* @param port Server port.
* @param registry Serialization registry.
+ * @param consistentId Consistent id of this node.
+ * @param serverHandshakeManagerFactory Server handshake manager factory.
+ * @param clientHandshakeManagerFactory Client handshake manager factory.
*/
- public ConnectionManager(int port, MessageSerializationRegistry registry) {
+ public ConnectionManager(
+ int port,
+ MessageSerializationRegistry registry,
+ String consistentId,
+ Supplier<HandshakeManager> serverHandshakeManagerFactory,
+ Supplier<HandshakeManager> clientHandshakeManagerFactory
+ ) {
this.serializationRegistry = registry;
- this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
- this.clientBootstrap = createClientBootstrap(clientWorkerGroup, serializationRegistry, this::onMessage);
+ this.consistentId = consistentId;
+ this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
+ this.server = new NettyServer(
+ port,
+ serverHandshakeManagerFactory.get(),
+ this::onNewIncomingChannel,
+ this::onMessage,
+ serializationRegistry
+ );
+ this.clientBootstrap = createClientBootstrap(clientWorkerGroup, serializationRegistry);
}
/**
@@ -110,24 +133,36 @@ public class ConnectionManager {
/**
* Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
+ * @param consistentId Another node's consistent id.
* @param address Another node's address.
* @return Sender.
*/
- public CompletableFuture<NettySender> channel(SocketAddress address) {
- NettySender channel = channels.compute(
- address,
- (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
- );
-
- if (channel != null)
- return CompletableFuture.completedFuture(channel);
+ public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+ if (consistentId != null) {
+ // If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
+ // or an inbound connection associated with that consistent id.
+ NettySender channel = channels.compute(
+ consistentId,
+ (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
+ );
+
+ if (channel != null)
+ return CompletableFuture.completedFuture(channel);
+ }
+ // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
+ // when the client is ready for write operations, so previously started client, that didn't establish connection
+ // or didn't perform the handhsake operaton, can be reused.
NettyClient client = clients.compute(address, (addr, existingClient) ->
existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected() ?
existingClient : connect(addr)
);
- return client.sender();
+ CompletableFuture<NettySender> sender = client.sender();
+
+ assert sender != null;
+
+ return sender;
}
/**
@@ -146,8 +181,7 @@ public class ConnectionManager {
* @param channel Channel from client to this {@link #server}.
*/
private void onNewIncomingChannel(NettySender channel) {
- SocketAddress remoteAddress = channel.remoteAddress();
- channels.put(remoteAddress, channel);
+ channels.put(channel.consistentId(), channel);
}
/**
@@ -156,14 +190,21 @@ public class ConnectionManager {
* @param address Target address.
* @return New netty client.
*/
- private NettyClient connect(SocketAddress address) {
- NettyClient client = new NettyClient(address, serializationRegistry);
+ private NettyClient connect(
+ SocketAddress address
+ ) {
+ var client = new NettyClient(
+ address,
+ serializationRegistry,
+ clientHandshakeManagerFactory.get(),
+ this::onMessage
+ );
client.start(clientBootstrap).whenComplete((sender, throwable) -> {
- if (throwable != null)
- clients.remove(address);
+ if (throwable == null)
+ channels.put(sender.consistentId(), sender);
else
- channels.put(address, sender);
+ clients.remove(address);
});
return client;
@@ -208,6 +249,14 @@ public class ConnectionManager {
}
/**
+ * @return This node's consistent id.
+ */
+ @TestOnly
+ public String consistentId() {
+ return consistentId;
+ }
+
+ /**
* @return Collection of all the clients started by this connection manager.
*/
@TestOnly
@@ -215,18 +264,25 @@ public class ConnectionManager {
return Collections.unmodifiableCollection(clients.values());
}
+
+ /**
+ * @return Map of the channels.
+ */
+ @TestOnly
+ public Map<String, NettySender> channels() {
+ return Collections.unmodifiableMap(channels);
+ }
+
/**
* Creates a {@link Bootstrap} for clients, providing channel handlers and options.
*
* @param eventLoopGroup Event loop group for channel handling.
* @param serializationRegistry Serialization registry.
- * @param messageListener Message listener.
* @return Bootstrap for clients.
*/
public static Bootstrap createClientBootstrap(
EventLoopGroup eventLoopGroup,
- MessageSerializationRegistry serializationRegistry,
- BiConsumer<SocketAddress, NetworkMessage> messageListener
+ MessageSerializationRegistry serializationRegistry
) {
Bootstrap clientBootstrap = new Bootstrap();
@@ -234,16 +290,8 @@ public class ConnectionManager {
.channel(NioSocketChannel.class)
// See NettyServer#start for netty configuration details.
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- /** {@inheritDoc} */
- @Override public void initChannel(SocketChannel ch) {
- ch.pipeline().addLast(
- new InboundDecoder(serializationRegistry),
- new MessageHandler(messageListener),
- new ChunkedWriteHandler()
- );
- }
- });
+ .option(ChannelOption.SO_LINGER, 0)
+ .option(ChannelOption.TCP_NODELAY, true);
return clientBootstrap;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
new file mode 100644
index 0000000..fdbfa86
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeException;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+
+/**
+ * Netty handler of the handshake operation.
+ */
+public class HandshakeHandler extends ChannelInboundHandlerAdapter {
+ /** Handshake manager. */
+ private final HandshakeManager manager;
+
+ /**
+ * Constructor.
+ *
+ * @param manager Handshake manager.
+ */
+ public HandshakeHandler(HandshakeManager manager) {
+ this.manager = manager;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ HandshakeAction handshakeAction = manager.init(ctx.channel());
+
+ handleHandshakeAction(handshakeAction, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ HandshakeAction handshakeAction = manager.onConnectionOpen(ctx.channel());
+
+ handleHandshakeAction(handshakeAction, ctx);
+
+ ctx.fireChannelActive();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ HandshakeAction handshakeAction = manager.onMessage(ctx.channel(), (NetworkMessage) msg);
+
+ handleHandshakeAction(handshakeAction, ctx);
+ // No need to forward the message to the next handler as this message only matters for a handshake.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // If this method is called that means channel has been closed before handshake has finished or handshake
+ // has failed.
+ manager.handshakeFuture().completeExceptionally(
+ new HandshakeException("Channel has been closed before handshake has finished or handshake has failed")
+ );
+
+ ctx.fireChannelInactive();
+ }
+
+ /**
+ * Handle {@link HandshakeAction}.
+ *
+ * @param action Handshake action.
+ * @param ctx Netty channel context.
+ */
+ private void handleHandshakeAction(HandshakeAction action, ChannelHandlerContext ctx) {
+ switch (action) {
+ case REMOVE_HANDLER:
+ ctx.pipeline().remove(this);
+ break;
+
+ case FAIL:
+ ctx.channel().close();
+ break;
+
+ case NOOP:
+ break;
+ }
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
index 196200f..0349056 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network.internal.netty;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import io.netty.buffer.ByteBuf;
@@ -122,4 +123,15 @@ public class InboundDecoder extends ByteToMessageDecoder {
}
}
}
+
+ /** {@inheritDoc} */
+ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Exception caught: " + cause.getMessage(), cause);
+
+ // Ignore IOExceptions that are thrown from the Netty's insides. IOExceptions that occured during reads
+ // or writes should be handled elsewhere.
+ if (cause instanceof Exception && !(cause instanceof IOException) )
+ throw (Exception) cause;
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
index de5bf60..240e5c2 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
@@ -17,14 +17,19 @@
package org.apache.ignite.network.internal.netty;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
import java.util.function.Function;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.jetbrains.annotations.Nullable;
@@ -41,52 +46,53 @@ public class NettyClient {
/** Destination address. */
private final SocketAddress address;
- /** Future that resolves when client channel is opened. */
+ /** Future that resolves when the client finished the handshake. */
@Nullable
private volatile CompletableFuture<NettySender> clientFuture = null;
+ /** Future that resolves when the client channel is opened. */
+ private CompletableFuture<Void> channelFuture = new CompletableFuture<>();
+
/** Client channel. */
@Nullable
private volatile Channel channel = null;
+ /** Message listener. */
+ private final BiConsumer<SocketAddress, NetworkMessage> messageListener;
+
+ /** Handshake manager. */
+ private final HandshakeManager handshakeManager;
+
/** Flag indicating if {@link #stop()} has been called. */
private boolean stopped = false;
/**
* Constructor.
*
- * @param host Host.
- * @param port Port.
- * @param serializationRegistry Serialization registry.
- */
- public NettyClient(
- String host,
- int port,
- MessageSerializationRegistry serializationRegistry
- ) {
- this(new InetSocketAddress(host, port), serializationRegistry);
- }
-
- /**
- * Constructor.
- *
* @param address Destination address.
* @param serializationRegistry Serialization registry.
+ * @param manager Client handshake manager.
+ * @param messageListener Message listener.
*/
public NettyClient(
SocketAddress address,
- MessageSerializationRegistry serializationRegistry
+ MessageSerializationRegistry serializationRegistry,
+ HandshakeManager manager,
+ BiConsumer<SocketAddress, NetworkMessage> messageListener
) {
this.address = address;
this.serializationRegistry = serializationRegistry;
+ this.handshakeManager = manager;
+ this.messageListener = messageListener;
}
/**
* Start client.
*
+ * @param bootstrapTemplate Template client bootstrap.
* @return Future that resolves when client channel is opened.
*/
- public CompletableFuture<NettySender> start(Bootstrap bootstrap) {
+ public CompletableFuture<NettySender> start(Bootstrap bootstrapTemplate) {
synchronized (startStopLock) {
if (stopped)
throw new IgniteInternalException("Attempted to start an already stopped NettyClient");
@@ -94,17 +100,37 @@ public class NettyClient {
if (clientFuture != null)
throw new IgniteInternalException("Attempted to start an already started NettyClient");
+ Bootstrap bootstrap = bootstrapTemplate.clone();
+
+ bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+ /** {@inheritDoc} */
+ @Override public void initChannel(SocketChannel ch) {
+ ch.pipeline().addLast(
+ new InboundDecoder(serializationRegistry),
+ new HandshakeHandler(handshakeManager),
+ new MessageHandler(messageListener),
+ new ChunkedWriteHandler(),
+ new OutboundEncoder(serializationRegistry)
+ );
+ }
+ });
+
clientFuture = NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
.handle((channel, throwable) -> {
synchronized (startStopLock) {
this.channel = channel;
+ if (throwable != null)
+ channelFuture.completeExceptionally(throwable);
+ else
+ channelFuture.complete(null);
+
if (stopped)
return CompletableFuture.<NettySender>failedFuture(new CancellationException("Client was stopped"));
else if (throwable != null)
return CompletableFuture.<NettySender>failedFuture(throwable);
else
- return CompletableFuture.completedFuture(new NettySender(channel, serializationRegistry));
+ return handshakeManager.handshakeFuture();
}
})
.thenCompose(Function.identity());
@@ -137,7 +163,7 @@ public class NettyClient {
if (clientFuture == null)
return CompletableFuture.completedFuture(null);
- return clientFuture
+ return channelFuture
.handle((sender, throwable) ->
channel == null ?
CompletableFuture.<Void>completedFuture(null) :
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
index 649a9e7..e8553fb 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
@@ -17,18 +17,12 @@
package org.apache.ignite.network.internal.netty;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.internal.direct.DirectMessageWriter;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.apache.ignite.network.serialization.MessageSerializer;
+import org.jetbrains.annotations.TestOnly;
/**
* Wrapper for a Netty {@link Channel}, that uses {@link ChunkedInput} and {@link DirectMessageWriter} to send data.
@@ -37,130 +31,68 @@ public class NettySender {
/** Netty channel. */
private final Channel channel;
- /** Serialization registry. */
- private final MessageSerializationRegistry serializationRegistry;
+ /** Launch id of the remote node. */
+ private final String launchId;
+
+ /** Consistent id of the remote node. */
+ private final String consistentId;
/**
* Constructor.
*
* @param channel Netty channel.
- * @param registry Serialization registry.
+ * @param launchId Launch id of the remote node.
+ * @param consistentId Consistent id of the remote node.
*/
- public NettySender(Channel channel, MessageSerializationRegistry registry) {
+ public NettySender(Channel channel, String launchId, String consistentId) {
this.channel = channel;
- serializationRegistry = registry;
+ this.launchId = launchId;
+ this.consistentId = consistentId;
}
/**
- * Send message.
+ * Sends the message.
*
* @param msg Network message.
+ * @return Future of the send operation.
*/
public CompletableFuture<Void> send(NetworkMessage msg) {
- MessageSerializer<NetworkMessage> serializer = serializationRegistry.createSerializer(msg.directType());
+ return NettyUtils.toCompletableFuture(channel.writeAndFlush(msg));
+ }
- return NettyUtils.toCompletableFuture(
- channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer, serializationRegistry))
- );
+ /**
+ * @return Launch id of the remote node.
+ */
+ public String launchId() {
+ return launchId;
}
/**
- * Close channel.
+ * @return Consistent id of the remote node.
*/
- public void close() {
- this.channel.close().awaitUninterruptibly();
+ public String consistentId() {
+ return consistentId;
}
/**
- * @return Gets the remote address of the channel.
+ * Closes channel.
*/
- public SocketAddress remoteAddress() {
- return this.channel.remoteAddress();
+ public void close() {
+ this.channel.close().awaitUninterruptibly();
}
/**
- * @return {@code true} if channel is open, {@code false} otherwise.
+ * @return {@code true} if the channel is open, {@code false} otherwise.
*/
public boolean isOpen() {
return this.channel.isOpen();
}
/**
- * Chunked input for network message.
+ * @return Channel.
*/
- private static class NetworkMessageChunkedInput implements ChunkedInput<ByteBuf> {
- /** Network message. */
- private final NetworkMessage msg;
-
- /** Message serializer. */
- private final MessageSerializer<NetworkMessage> serializer;
-
- /** Message writer. */
- private final DirectMessageWriter writer;
-
- /** Whether the message was fully written. */
- private boolean finished = false;
-
- /**
- * Constructor.
- *
- * @param msg Network message.
- * @param serializer Serializer.
- */
- private NetworkMessageChunkedInput(
- NetworkMessage msg,
- MessageSerializer<NetworkMessage> serializer,
- MessageSerializationRegistry registry
- ) {
- this.msg = msg;
- this.serializer = serializer;
- this.writer = new DirectMessageWriter(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isEndOfInput() throws Exception {
- return finished;
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws Exception {
-
- }
-
- /** {@inheritDoc} */
- @Deprecated
- @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
- return readChunk(ctx.alloc());
- }
-
- /** {@inheritDoc} */
- @Override public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
- ByteBuf buffer = allocator.ioBuffer();
- int capacity = buffer.capacity();
-
- ByteBuffer byteBuffer = buffer.internalNioBuffer(0, capacity);
-
- int initialPosition = byteBuffer.position();
-
- writer.setBuffer(byteBuffer);
-
- finished = serializer.writeMessage(msg, writer);
-
- buffer.writerIndex(byteBuffer.position() - initialPosition);
-
- return buffer;
- }
-
- /** {@inheritDoc} */
- @Override public long length() {
- // Return negative values, because object's size is unknown.
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public long progress() {
- // Not really needed, as there won't be listeners for the write operation's progress.
- return 0;
- }
+ @TestOnly
+ public Channel channel() {
+ return channel;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
index feb99ed..d68a370 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
@@ -33,6 +33,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.jetbrains.annotations.Nullable;
@@ -63,6 +64,9 @@ public class NettyServer {
/** Incoming message listener. */
private final BiConsumer<SocketAddress, NetworkMessage> messageListener;
+ /** Handshake manager. */
+ private final HandshakeManager handshakeManager;
+
/** Server start future. */
private CompletableFuture<Void> serverStartFuture;
@@ -86,17 +90,19 @@ public class NettyServer {
* Constructor.
*
* @param port Server port.
+ * @param handshakeManager Handshake manager.
* @param newConnectionListener New connections listener.
* @param messageListener Message listener.
* @param serializationRegistry Serialization registry.
*/
public NettyServer(
int port,
+ HandshakeManager handshakeManager,
Consumer<NettySender> newConnectionListener,
BiConsumer<SocketAddress, NetworkMessage> messageListener,
MessageSerializationRegistry serializationRegistry
) {
- this(new ServerBootstrap(), port, newConnectionListener, messageListener, serializationRegistry);
+ this(new ServerBootstrap(), port, handshakeManager, newConnectionListener, messageListener, serializationRegistry);
}
/**
@@ -104,6 +110,7 @@ public class NettyServer {
*
* @param bootstrap Server bootstrap.
* @param port Server port.
+ * @param handshakeManager Handshake manager.
* @param newConnectionListener New connections listener.
* @param messageListener Message listener.
* @param serializationRegistry Serialization registry.
@@ -111,12 +118,14 @@ public class NettyServer {
public NettyServer(
ServerBootstrap bootstrap,
int port,
+ HandshakeManager handshakeManager,
Consumer<NettySender> newConnectionListener,
BiConsumer<SocketAddress, NetworkMessage> messageListener,
MessageSerializationRegistry serializationRegistry
) {
this.bootstrap = bootstrap;
this.port = port;
+ this.handshakeManager = handshakeManager;
this.newConnectionListener = newConnectionListener;
this.messageListener = messageListener;
this.serializationRegistry = serializationRegistry;
@@ -146,16 +155,23 @@ public class NettyServer {
* to read chunked data.
*/
new InboundDecoder(serializationRegistry),
+ // Handshake handler.
+ new HandshakeHandler(handshakeManager),
// Handles decoded NetworkMessages.
new MessageHandler(messageListener),
/*
* Encoder that uses org.apache.ignite.network.internal.MessageWriter
* to write chunked data.
*/
- new ChunkedWriteHandler()
+ new ChunkedWriteHandler(),
+ // Converts NetworkMessage to a ChunkedNetworkMessageInput
+ new OutboundEncoder(serializationRegistry)
);
- newConnectionListener.accept(new NettySender(ch, serializationRegistry));
+ handshakeManager.handshakeFuture().whenComplete((sender, throwable) -> {
+ if (sender != null)
+ newConnectionListener.accept(sender);
+ });
}
})
/*
@@ -169,7 +185,24 @@ public class NettyServer {
* in either direction for 2 hours (NOTE: the actual value is implementation dependent),
* TCP automatically sends a keepalive probe to the peer.
*/
- .childOption(ChannelOption.SO_KEEPALIVE, true);
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ /*
+ * Specify a linger-on-close timeout. This option disables/enables immediate return from a close()
+ * of a TCP Socket. Enabling this option with a non-zero Integer timeout means that a close() will
+ * block pending the transmission and acknowledgement of all data written to the peer, at which point
+ * the socket is closed gracefully. Upon reaching the linger timeout, the socket is closed forcefully,
+ * with a TCP RST. Enabling the option with a timeout of zero does a forceful close immediately.
+ * If the specified timeout value exceeds 65,535 it will be reduced to 65,535.
+ */
+ .childOption(ChannelOption.SO_LINGER, 0)
+ /*
+ * Disable Nagle's algorithm for this connection. Written data to the network is not buffered pending
+ * acknowledgement of previously written data. Valid for TCP only. Setting this option reduces
+ * network latency and and delivery time for small messages.
+ * For more information, see Socket#setTcpNoDelay(boolean)
+ * and https://en.wikipedia.org/wiki/Nagle%27s_algorithm.
+ */
+ .childOption(ChannelOption.TCP_NODELAY, true);
serverStartFuture = NettyUtils.toChannelCompletableFuture(bootstrap.bind(port))
.handle((channel, err) -> {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyUtils.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyUtils.java
index 5c02b1a..3d8e7a0 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyUtils.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyUtils.java
@@ -58,6 +58,7 @@ public class NettyUtils {
/**
* Convert a Netty {@link Future} to a {@link CompletableFuture}.
*
+ * @param <T> Type of the future.
* @param future Future.
* @return CompletableFuture.
*/
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/OutboundEncoder.java
similarity index 76%
copy from modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/netty/OutboundEncoder.java
index 649a9e7..2e249e1 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/OutboundEncoder.java
@@ -17,13 +17,12 @@
package org.apache.ignite.network.internal.netty;
-import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
+import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.stream.ChunkedInput;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.internal.direct.DirectMessageWriter;
@@ -31,58 +30,26 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.serialization.MessageSerializer;
/**
- * Wrapper for a Netty {@link Channel}, that uses {@link ChunkedInput} and {@link DirectMessageWriter} to send data.
+ * An encoder for the outbound messages that uses {@link DirectMessageWriter}.
*/
-public class NettySender {
- /** Netty channel. */
- private final Channel channel;
-
+public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
/** Serialization registry. */
private final MessageSerializationRegistry serializationRegistry;
/**
* Constructor.
*
- * @param channel Netty channel.
* @param registry Serialization registry.
*/
- public NettySender(Channel channel, MessageSerializationRegistry registry) {
- this.channel = channel;
+ public OutboundEncoder(MessageSerializationRegistry registry) {
serializationRegistry = registry;
}
- /**
- * Send message.
- *
- * @param msg Network message.
- */
- public CompletableFuture<Void> send(NetworkMessage msg) {
+ /** {@inheritDoc} */
+ @Override protected void encode(ChannelHandlerContext ctx, NetworkMessage msg, List<Object> out) throws Exception {
MessageSerializer<NetworkMessage> serializer = serializationRegistry.createSerializer(msg.directType());
- return NettyUtils.toCompletableFuture(
- channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer, serializationRegistry))
- );
- }
-
- /**
- * Close channel.
- */
- public void close() {
- this.channel.close().awaitUninterruptibly();
- }
-
- /**
- * @return Gets the remote address of the channel.
- */
- public SocketAddress remoteAddress() {
- return this.channel.remoteAddress();
- }
-
- /**
- * @return {@code true} if channel is open, {@code false} otherwise.
- */
- public boolean isOpen() {
- return this.channel.isOpen();
+ out.add(new NetworkMessageChunkedInput(msg, serializer, serializationRegistry));
}
/**
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryClientHandshakeManager.java
new file mode 100644
index 0000000..a0bf0e1
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryClientHandshakeManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeException;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeMessageFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+
+/**
+ * Recovery protocol handshake manager for a client.
+ */
+public class RecoveryClientHandshakeManager implements HandshakeManager {
+ /** Launch id. */
+ private final UUID launchId;
+
+ /** Consistent id. */
+ private final String consistentId;
+
+ /** Handshake completion future. */
+ private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
+
+ /**
+ * Constructor.
+ *
+ * @param launchId Launch id.
+ * @param consistentId Consistent id.
+ */
+ public RecoveryClientHandshakeManager(UUID launchId, String consistentId) {
+ this.launchId = launchId;
+ this.consistentId = consistentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+ if (message instanceof HandshakeStartMessage) {
+ HandshakeStartMessage msg = (HandshakeStartMessage) message;
+
+ HandshakeStartResponseMessage response = HandshakeMessageFactory.handshakeStartResponseMessage()
+ .launchId(launchId)
+ .consistentId(consistentId)
+ .receivedCount(0)
+ .connectionsCount(0)
+ .build();
+
+ ChannelFuture sendFuture = channel.writeAndFlush(response);
+
+ NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
+ if (throwable != null)
+ handshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable)
+ );
+ else
+ handshakeCompleteFuture.complete(new NettySender(channel, msg.launchId().toString(), msg.consistentId()));
+ });
+
+ return HandshakeAction.REMOVE_HANDLER;
+ }
+
+ handshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Unexpected message during handshake: " + message.toString())
+ );
+
+ return HandshakeAction.FAIL;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<NettySender> handshakeFuture() {
+ return handshakeCompleteFuture;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction init(Channel channel) {
+ return HandshakeAction.NOOP;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onConnectionOpen(Channel channel) {
+ return HandshakeAction.NOOP;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryServerHandshakeManager.java
new file mode 100644
index 0000000..61f1a6e
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryServerHandshakeManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeException;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeMessageFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+
+/**
+ * Recovery protocol handshake manager for a server.
+ */
+public class RecoveryServerHandshakeManager implements HandshakeManager {
+ /** Launch id. */
+ private final UUID launchId;
+
+ /** Consistent id. */
+ private final String consistentId;
+
+ /** Handshake completion future. */
+ private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
+
+ /**
+ * Constructor.
+ *
+ * @param launchId Launch id.
+ * @param consistentId Consistent id.
+ */
+ public RecoveryServerHandshakeManager(UUID launchId, String consistentId) {
+ this.launchId = launchId;
+ this.consistentId = consistentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction init(Channel channel) {
+ return HandshakeAction.NOOP;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onConnectionOpen(Channel channel) {
+ HandshakeStartMessage handshakeStartMessage = HandshakeMessageFactory.handshakeStartMessage()
+ .launchId(launchId)
+ .consistentId(consistentId)
+ .build();
+
+ ChannelFuture sendFuture = channel.writeAndFlush(handshakeStartMessage);
+
+ NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
+ if (throwable != null)
+ handshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Failed to send handshake start message: " + throwable.getMessage(), throwable)
+ );
+ });
+
+ return HandshakeAction.NOOP;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+ if (message instanceof HandshakeStartResponseMessage) {
+ HandshakeStartResponseMessage msg = (HandshakeStartResponseMessage) message;
+
+ handshakeCompleteFuture.complete(new NettySender(channel, msg.launchId().toString(), msg.consistentId()));
+
+ return HandshakeAction.REMOVE_HANDLER;
+ }
+
+ handshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Unexpected message during handshake: " + message.toString())
+ );
+
+ return HandshakeAction.FAIL;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<NettySender> handshakeFuture() {
+ return handshakeCompleteFuture;
+ }
+}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeMessageFactory.java
similarity index 67%
copy from modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeMessageFactory.java
index b28c42d..bde8e55 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeMessageFactory.java
@@ -14,12 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.network;
-/** */
-public interface ClusterServiceFactory {
- /**
- * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
- */
- ClusterService createClusterService(ClusterLocalConfiguration context);
+package org.apache.ignite.network.internal.recovery.message;
+
+public class HandshakeMessageFactory {
+
+ public static HandshakeStartMessage.Builder handshakeStartMessage() {
+ return new HandshakeStartMessageImpl();
+ }
+
+ public static HandshakeStartResponseMessage.Builder handshakeStartResponseMessage() {
+ return new HandshakeStartResponseMessageImpl();
+ }
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessage.java
similarity index 59%
copy from modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessage.java
index dc6de9b..ee4bb2f 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessage.java
@@ -15,30 +15,36 @@
* limitations under the License.
*/
-package org.apache.ignite.network;
+package org.apache.ignite.network.internal.recovery.message;
-import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.processor.annotations.AutoSerializable;
-@AutoSerializable(messageFactory = TestMessageFactory.class)
-public interface TestMessage extends NetworkMessage {
- /** Visible type for tests. */
- public static final short TYPE = 3;
-
- String msg();
+/**
+ * Handshake start message.
+ */
+@AutoSerializable(messageFactory = HandshakeMessageFactory.class)
+public interface HandshakeStartMessage extends NetworkMessage {
+ /** */
+ public static final byte TYPE = 2;
- Map<Integer, String> map();
+ /** Launch id. */
+ UUID launchId();
- /** {@inheritDoc} */
- @Override public default short directType() {
- return TYPE;
- }
+ /** Consistent id. */
+ String consistentId();
interface Builder {
- Builder msg(String msg);
+ HandshakeStartMessage.Builder launchId(UUID launchId);
- Builder map(Map<Integer, String> map);
+ HandshakeStartMessage.Builder consistentId(String consistentId);
- TestMessage build();
+ HandshakeStartMessage build();
+ }
+
+ /** {@inheritDoc} */
+ @Override default short directType() {
+ return TYPE;
}
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessageImpl.java
similarity index 50%
copy from modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessageImpl.java
index dc6de9b..fdde622 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessageImpl.java
@@ -15,30 +15,41 @@
* limitations under the License.
*/
-package org.apache.ignite.network;
+package org.apache.ignite.network.internal.recovery.message;
-import java.util.Map;
-import org.apache.ignite.network.processor.annotations.AutoSerializable;
+import java.util.UUID;
-@AutoSerializable(messageFactory = TestMessageFactory.class)
-public interface TestMessage extends NetworkMessage {
- /** Visible type for tests. */
- public static final short TYPE = 3;
+public class HandshakeStartMessageImpl implements HandshakeStartMessage, HandshakeStartMessage.Builder {
+ /** */
+ private UUID launchId;
- String msg();
+ /** */
+ private String consistentId;
- Map<Integer, String> map();
+ /** {@inheritDoc} */
+ @Override public UUID launchId() {
+ return launchId;
+ }
/** {@inheritDoc} */
- @Override public default short directType() {
- return TYPE;
+ @Override public String consistentId() {
+ return consistentId;
}
- interface Builder {
- Builder msg(String msg);
+ /** {@inheritDoc} */
+ @Override public Builder launchId(UUID launchId) {
+ this.launchId = launchId;
+ return this;
+ }
- Builder map(Map<Integer, String> map);
+ /** {@inheritDoc} */
+ @Override public Builder consistentId(String consistentId) {
+ this.consistentId = consistentId;
+ return this;
+ }
- TestMessage build();
+ /** {@inheritDoc} */
+ @Override public HandshakeStartMessage build() {
+ return this;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessage.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessage.java
new file mode 100644
index 0000000..b7b69a2
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery.message;
+
+import java.util.UUID;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.processor.annotations.AutoSerializable;
+
+/**
+ * Handshake start response message.
+ */
+@AutoSerializable(messageFactory = HandshakeMessageFactory.class)
+public interface HandshakeStartResponseMessage extends NetworkMessage {
+ /** */
+ public static final byte TYPE = 3;
+
+ /** Launch id. */
+ UUID launchId();
+
+ /** Consistent id. */
+ String consistentId();
+
+ /** Number of received messages. */
+ long receivedCount();
+
+ /** Connections count. */
+ long connectionsCount();
+
+ interface Builder {
+ HandshakeStartResponseMessage.Builder launchId(UUID launchId);
+
+ HandshakeStartResponseMessage.Builder consistentId(String consistentId);
+
+ HandshakeStartResponseMessage.Builder receivedCount(long receivedCount);
+
+ HandshakeStartResponseMessage.Builder connectionsCount(long connectionsCount);
+
+ HandshakeStartResponseMessage build();
+ }
+
+ /** {@inheritDoc} */
+ @Override default short directType() {
+ return TYPE;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessageImpl.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessageImpl.java
new file mode 100644
index 0000000..c9bdf5f
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessageImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery.message;
+
+import java.util.UUID;
+
+public class HandshakeStartResponseMessageImpl implements HandshakeStartResponseMessage, HandshakeStartResponseMessage.Builder {
+ /** */
+ private UUID launchId;
+
+ /** */
+ private String consistentId;
+
+ /** */
+ private long rcvCnt;
+
+ /** */
+ private long connCnt;
+
+ /** {@inheritDoc} */
+ @Override public UUID launchId() {
+ return launchId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String consistentId() {
+ return consistentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long receivedCount() {
+ return rcvCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long connectionsCount() {
+ return connCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Builder launchId(UUID launchId) {
+ this.launchId = launchId;
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Builder consistentId(String consistentId) {
+ this.consistentId = consistentId;
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Builder receivedCount(long rcvCnt) {
+ this.rcvCnt = rcvCnt;
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Builder connectionsCount(long connectCnt) {
+ this.connCnt = connectCnt;
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeStartResponseMessage build() {
+ return this;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 25de258..396a3d0 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -19,6 +19,7 @@ package org.apache.ignite.network.scalecube;
import java.lang.management.ManagementFactory;
import java.util.List;
+import java.util.UUID;
import java.util.stream.Collectors;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
@@ -37,6 +38,8 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.NetworkConfigurationException;
import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.network.internal.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
/**
@@ -46,15 +49,25 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry;
public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
/** {@inheritDoc} */
@Override public ClusterService createClusterService(ClusterLocalConfiguration context) {
+ String consistentId = context.getName();
+
var topologyService = new ScaleCubeTopologyService();
var messagingService = new ScaleCubeMessagingService(topologyService);
MessageSerializationRegistry registry = context.getSerializationRegistry();
- var connectionManager = new ConnectionManager(context.getPort(), registry);
+ UUID launchId = UUID.randomUUID();
+
+ var connectionManager = new ConnectionManager(
+ context.getPort(),
+ registry,
+ consistentId,
+ () -> new RecoveryServerHandshakeManager(launchId, consistentId),
+ () -> new RecoveryClientHandshakeManager(launchId, consistentId)
+ );
- ScaleCubeDirectMarshallerTransport transport = new ScaleCubeDirectMarshallerTransport(connectionManager);
+ ScaleCubeDirectMarshallerTransport transport = new ScaleCubeDirectMarshallerTransport(connectionManager, topologyService);
var cluster = new ClusterImpl(defaultConfig())
.handler(cl -> new ClusterMessageHandler() {
@@ -68,7 +81,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
topologyService.onMembershipEvent(event);
}
})
- .config(opts -> opts.memberAlias(context.getName()))
+ .config(opts -> opts.memberAlias(consistentId))
.transport(opts -> opts.transportFactory(new DelegatingTransportFactory(messagingService, config -> transport)))
.membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ffa075a..235ef04 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -33,6 +33,7 @@ import io.scalecube.net.Address;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.internal.netty.ConnectionManager;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessageFactory;
@@ -67,6 +68,9 @@ public class ScaleCubeDirectMarshallerTransport implements Transport {
/** Connection manager. */
private final ConnectionManager connectionManager;
+ /** */
+ private final ScaleCubeTopologyService topologyService;
+
/** Node address. */
private Address address;
@@ -74,9 +78,12 @@ public class ScaleCubeDirectMarshallerTransport implements Transport {
* Constructor.
*
* @param connectionManager Connection manager.
+ * @param topologyService Topology service.
*/
- public ScaleCubeDirectMarshallerTransport(ConnectionManager connectionManager) {
+ public ScaleCubeDirectMarshallerTransport(ConnectionManager connectionManager, ScaleCubeTopologyService topologyService) {
this.connectionManager = connectionManager;
+ this.topologyService = topologyService;
+
this.connectionManager.addListener(this::onMessage);
// Setup cleanup
stop.then(doStop())
@@ -150,7 +157,11 @@ public class ScaleCubeDirectMarshallerTransport implements Transport {
var addr = InetSocketAddress.createUnresolved(address.host(), address.port());
return Mono.fromFuture(() -> {
- return connectionManager.channel(addr).thenCompose(client -> client.send(fromMessage(message)));
+ ClusterNode node = topologyService.getByAddress(address.toString());
+
+ String consistentId = node != null ? node.name() : null;
+
+ return connectionManager.channel(consistentId, addr).thenCompose(client -> client.send(fromMessage(message)));
});
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 0dab0e1..c7db910 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -17,12 +17,12 @@
package org.apache.ignite.network.scalecube;
-import java.time.Duration;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.network.AbstractMessagingService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
@@ -43,13 +43,19 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
*/
private ScaleCubeTopologyService topologyService;
- /** */
+ /**
+ * Constructor.
+ *
+ * @param topologyService Topology service.
+ */
ScaleCubeMessagingService(ScaleCubeTopologyService topologyService) {
this.topologyService = topologyService;
}
/**
* Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
+ *
+ * @param cluster Cluster.
*/
void setCluster(Cluster cluster) {
this.cluster = cluster;
@@ -57,6 +63,8 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
/**
* Delegates the received message to the registered message handlers.
+ *
+ * @param message Received message.
*/
void fireEvent(Message message) {
NetworkMessage msg = message.data();
@@ -110,6 +118,9 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
/**
* Extracts the given node's {@link Address}.
+ *
+ * @param node Node.
+ * @return Node's address.
*/
private static Address clusterNodeAddress(ClusterNode node) {
return Address.create(node.host(), node.port());
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 4f41fd0..4886b61 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -39,6 +39,8 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
/**
* Sets the ScaleCube's local {@link Member}.
+ *
+ * @param member Local member.
*/
void setLocalMember(Member member) {
localMember = fromMember(member);
@@ -49,6 +51,8 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
/**
* Delegates the received topology event to the registered event handlers.
+ *
+ * @param event Membership event.
*/
void onMembershipEvent(MembershipEvent event) {
ClusterNode member = fromMember(event.member());
diff --git a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/InboundDecoderTest.java b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/InboundDecoderTest.java
index 664771d..135857f 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/InboundDecoderTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/InboundDecoderTest.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageFactory;
import org.apache.ignite.network.TestMessageSerializationFactory;
@@ -35,7 +36,6 @@ import org.apache.ignite.network.internal.AllTypesMessage;
import org.apache.ignite.network.internal.AllTypesMessageGenerator;
import org.apache.ignite.network.internal.AllTypesMessageSerializationFactory;
import org.apache.ignite.network.internal.direct.DirectMessageWriter;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.serialization.MessageSerializer;
import org.junit.jupiter.api.Test;
@@ -194,7 +194,7 @@ public class InboundDecoderTest {
}
/**
- * Source of parameters for the {@link #test(long)} method.
+ * Source of parameters for the {@link #testAllTypes(long)} method.
* Creates seeds for a {@link AllTypesMessage} generation.
* @return Random seeds.
*/
diff --git a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
index 40436f6..e6adffa 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
@@ -17,9 +17,6 @@
package org.apache.ignite.network.internal.netty;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.embedded.EmbeddedChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
@@ -27,7 +24,14 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -156,11 +160,16 @@ public class NettyClientTest {
public void testStartTwice() throws Exception {
var channel = new EmbeddedChannel();
- Bootstrap bootstrap = Mockito.mock(Bootstrap.class);
+ Bootstrap bootstrap = mockBootstrap();
Mockito.doReturn(channel.newSucceededFuture()).when(bootstrap).connect(Mockito.any());
- client = new NettyClient(address, null);
+ client = new NettyClient(
+ address,
+ null,
+ new MockClientHandshakeManager(channel),
+ (address1, message) -> {}
+ );
client.start(bootstrap);
@@ -176,9 +185,14 @@ public class NettyClientTest {
* @return Client and a NettySender future.
*/
private ClientAndSender createClientAndSenderFromChannelFuture(ChannelFuture future) {
- var client = new NettyClient(address, null);
+ var client = new NettyClient(
+ address,
+ null,
+ new MockClientHandshakeManager(future.channel()),
+ (address1, message) -> {}
+ );
- Bootstrap bootstrap = Mockito.mock(Bootstrap.class);
+ Bootstrap bootstrap = mockBootstrap();
Mockito.doReturn(future).when(bootstrap).connect(Mockito.any());
@@ -186,6 +200,19 @@ public class NettyClientTest {
}
/**
+ * Create mock of a {@link Bootstrap} that implements {@link Bootstrap#clone()}.
+ *
+ * @return Mocked bootstrap.
+ */
+ private Bootstrap mockBootstrap() {
+ Bootstrap bootstrap = Mockito.mock(Bootstrap.class);
+
+ Mockito.doReturn(bootstrap).when(bootstrap).clone();
+
+ return bootstrap;
+ }
+
+ /**
* Tuple for a NettyClient and a future of a NettySender.
*/
private static class ClientAndSender {
@@ -206,4 +233,37 @@ public class NettyClientTest {
this.sender = sender;
}
}
+
+ /**
+ * Client handshake manager that doesn't do any actual handshaking.
+ */
+ private static class MockClientHandshakeManager implements HandshakeManager {
+ /** Sender. */
+ private final NettySender sender;
+
+ /** Constructor. */
+ private MockClientHandshakeManager(Channel channel) {
+ this.sender = new NettySender(channel, "", "");
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+ return HandshakeAction.REMOVE_HANDLER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<NettySender> handshakeFuture() {
+ return CompletableFuture.completedFuture(sender);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction init(Channel channel) {
+ return HandshakeAction.NOOP;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HandshakeAction onConnectionOpen(Channel channel) {
+ return HandshakeAction.NOOP;
+ }
+ }
}
diff --git a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
index 1a3af63..fc87d35 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
@@ -17,18 +17,34 @@
package org.apache.ignite.network.internal.netty;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ServerChannel;
import io.netty.channel.embedded.EmbeddedChannel;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageMappingException;
+import org.apache.ignite.network.serialization.MessageReader;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
import org.mockito.Mockito;
+import org.mockito.verification.VerificationMode;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -148,9 +164,101 @@ public class NettyServerTest {
}
/**
+ * Tests that handshake manager is invoked upon a client connecting to a server.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testHandshakeManagerInvoked() throws Exception {
+ HandshakeManager handshakeManager = Mockito.mock(HandshakeManager.class);
+
+ Mockito.doReturn(CompletableFuture.completedFuture(Mockito.mock(NettySender.class)))
+ .when(handshakeManager).handshakeFuture();
+
+ Mockito.doReturn(HandshakeAction.NOOP)
+ .when(handshakeManager).init(Mockito.any());
+
+ Mockito.doReturn(HandshakeAction.NOOP)
+ .when(handshakeManager).onConnectionOpen(Mockito.any());
+
+ Mockito.doReturn(HandshakeAction.NOOP)
+ .when(handshakeManager).onMessage(Mockito.any(), Mockito.any());
+
+ MessageSerializationRegistry registry = new MessageSerializationRegistry() {
+ /** {@inheritDoc} */
+ @Override public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short type) {
+ return (MessageDeserializer<T>) new MessageDeserializer<>() {
+ /** {@inheritDoc} */
+ @Override public boolean readMessage(MessageReader reader) throws MessageMappingException {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<NetworkMessage> klass() {
+ return NetworkMessage.class;
+ }
+
+ /** {@inheritDoc} */
+ @Override public NetworkMessage getMessage() {
+ return new NetworkMessage() {
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 0;
+ }
+ };
+ }
+ };
+ }
+ };
+
+ server = new NettyServer(4000, handshakeManager, sender -> {}, (socketAddress, message) -> {}, registry);
+
+ server.start().get(3, TimeUnit.SECONDS);
+
+ CompletableFuture<Channel> connectFut = NettyUtils.toChannelCompletableFuture(
+ new Bootstrap()
+ .channel(NioSocketChannel.class)
+ .group(new NioEventLoopGroup())
+ .handler(new ChannelInitializer<>() {
+ /** {@inheritDoc} */
+ @Override protected void initChannel(Channel ch) throws Exception {
+ // No-op.
+ }
+ })
+ .connect(server.address())
+ );
+
+ Channel channel = connectFut.get(3, TimeUnit.SECONDS);
+
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+
+ // One message only.
+ for (int i = 0; i < (NetworkMessage.DIRECT_TYPE_SIZE + 1); i++)
+ buffer.writeByte(1);
+
+ channel.writeAndFlush(buffer).get(3, TimeUnit.SECONDS);
+
+ channel.close().get(3, TimeUnit.SECONDS);
+
+ InOrder order = Mockito.inOrder(handshakeManager);
+
+ order.verify(handshakeManager, timeout()).init(Mockito.any());
+ order.verify(handshakeManager, timeout()).handshakeFuture();
+ order.verify(handshakeManager, timeout()).onConnectionOpen(Mockito.any());
+ order.verify(handshakeManager, timeout()).onMessage(Mockito.any(), Mockito.any());
+ }
+
+ /**
+ * @return Verification mode for a one call with a 3 second timeout.
+ */
+ private static VerificationMode timeout() {
+ return Mockito.timeout(TimeUnit.SECONDS.toMillis(3));
+ }
+
+ /**
* Creates a server from a backing {@link ChannelFuture}.
*
- * @param channel Server channel.
+ * @param future Server channel future.
* @param shouldStart {@code true} if a server should start successfully
* @return NettyServer.
* @throws Exception If failed.
@@ -160,7 +268,7 @@ public class NettyServerTest {
Mockito.doReturn(future).when(bootstrap).bind(Mockito.anyInt());
- var server = new NettyServer(bootstrap, 0, null, null, null);
+ var server = new NettyServer(bootstrap, 0, Mockito.mock(HandshakeManager.class), null, null, null);
try {
server.start().get(3, TimeUnit.SECONDS);
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
index 6d04917..ae3074e 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
@@ -23,6 +23,10 @@ import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
@@ -55,7 +59,9 @@ class ITRaftCounterServerTest {
/** */
private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry()
- .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory());
+ .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+ .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+ .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory());
/** */
private RaftServer server;
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index be41858..8e65067 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -46,8 +46,8 @@ import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryInitializer;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.scalecube.message.MessageSerializationRegistryInitializer;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.utils.IgniteProperties;
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index c8864ec..a626ded 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -44,6 +44,10 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
@@ -91,7 +95,9 @@ public class ITDistributedTableTest {
/** */
private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry()
- .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory());
+ .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+ .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+ .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory());
/** Client. */
private ClusterService client;