You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/05/13 10:46:17 UTC

[GitHub] [ignite-3] sashapolo commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

sashapolo commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871562173


##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java:
##########
@@ -28,29 +27,23 @@
  */
 public interface HandshakeManager {
     /**
-     * Initialize handshake manager with the channel.
+     * Initializes handshake manager with the channel.
      *
-     * @param channel Channel.
-     * @return Action to perform by {@link HandshakeHandler}.
+     * @param handlerContext Channel handler context.
      */
-    HandshakeResult init(Channel channel);
+    void onInit(ChannelHandlerContext handlerContext);

Review Comment:
   why did you rename this method?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -75,40 +79,41 @@ public class ConnectionManager {
     /** Node consistent id. */
     private final String consistentId;
 
-    /** Client handshake manager factory. */
-    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
+    /** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
+    private final UUID launchId;
 
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
 
     /** Stop flag. */
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
+    /** Recovery descriptor provider. */
+    private final RecoveryDescriptorProvider descriptorProvider = new DefaultRecoveryDescriptorProvider();

Review Comment:
   I would suggest to rename `RecoveryDescriptorProvider` to `RecoveryDescriptorFactory`



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -284,6 +289,14 @@ public boolean isStopped() {
         return stopped.get();
     }
 
+    protected HandshakeManager createClientHandshakeManager(short connectionId) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, FACTORY, descriptorProvider);
+    }
+
+    protected HandshakeManager createServerHandshakeManager() {

Review Comment:
   ```suggestion
       private HandshakeManager createServerHandshakeManager() {
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {
             if (throwable == null) {
-                channels.put(sender.consistentId(), sender);
+                // No-op

Review Comment:
   I guess we can simply write:
   ```
   if (throwable != null) {
       clients.remove(address);
   }
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements RecoveryDescriptorProvider {
+    // TODO: IGNITE-16954 Make this configurable
+    private static final int DEFAULT_QUEUE_LIMIT = 10;
+
+    /** Recovery descriptors. */
+    private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID launchId, short connectionIndex, boolean inbound) {
+        var key = new ChannelKey(consistentId, launchId, connectionIndex, inbound);
+
+        return recoveryDescriptors.computeIfAbsent(key, channelKey -> new RecoveryDescriptor(DEFAULT_QUEUE_LIMIT));
+    }
+
+    /** Channel key. */
+    private static class ChannelKey {
+        /** Remote node's consistent id. */
+        private final String consistentId;
+
+        /** Remote node's launch id. */
+        private final UUID launchId;
+
+        /**
+         * Connection id. Every connection between this node and a remote node has a unique connection id,
+         * but connections with different nodes may have same ids.

Review Comment:
   ```suggestion
            * but connections with different nodes may have same ids.
   ```
   ```suggestion
            * but connections with different nodes may have the same ids.
   ```



##########
modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Recovery protocol handshake flow test.
+ */
+public class RecoveryHandshakeTest {
+    /** Connection id. */
+    private static final short CONNECTION_ID = 1337;
+
+    /** Serialization registry. */
+    private static final MessageSerializationRegistry MESSAGE_REGISTRY = new TestMessageSerializationRegistryImpl();
+
+    /** Message factory. */
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
+
+    /** Test message factory. */
+    private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new TestMessagesFactory();
+
+    @Test
+    public void testHandshake() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);

Review Comment:
   Please check `var` usages in this class, may of them are illegal



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -60,6 +63,7 @@ public class ConnectionManager {
     /** Server. */
     private final NettyServer server;
 
+    // TODO: IGNITE-16948 Should be a map consistentId -> connectionId -> sender

Review Comment:
   IGNITE-16948 does not describe why this should be a different map



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {

Review Comment:
   `connectionId` parameter is always 0, do I understand correctly that it will be changed under IGNITE-16948?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/BaseRecoveryHandshakeManager.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Base recovery handshake manager.
+ */
+public abstract class BaseRecoveryHandshakeManager implements HandshakeManager {
+    /** Handshake completion future. */
+    protected final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
+
+    /** Remote node's launch id. */
+    protected UUID remoteLaunchId;
+
+    /** Remote node's consistent id. */
+    protected String remoteConsistentId;
+
+    /** Connection id. */
+    protected short connectionId;
+
+    /** Netty pipeline channel handler context. */
+    protected ChannelHandlerContext ctx;
+
+    /** Channel. */
+    protected Channel channel;
+
+    /** Netty pipeline handshake handler. */
+    protected HandshakeHandler handler;
+
+    /** Recovery descriptor. */
+    protected RecoveryDescriptor recoveryDescriptor;
+
+    /** {@inheritDoc} */
+    @Override
+    public void onInit(ChannelHandlerContext handlerContext) {
+        this.ctx = handlerContext;
+        this.channel = handlerContext.channel();
+        this.handler = (HandshakeHandler) ctx.handler();
+    }
+
+    /**
+     * Creates a message handler using the consistent id of a remote node.
+     *
+     * @return New message handler.
+     */
+    public MessageHandler createMessageHandler() {
+        return handler.createMessageHandler(remoteConsistentId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NettySender> handshakeFuture() {
+        return handshakeCompleteFuture;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onConnectionOpen() {

Review Comment:
   Why do you need to override this method here?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/BaseRecoveryHandshakeManager.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Base recovery handshake manager.
+ */
+public abstract class BaseRecoveryHandshakeManager implements HandshakeManager {
+    /** Handshake completion future. */
+    protected final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
+
+    /** Remote node's launch id. */
+    protected UUID remoteLaunchId;
+
+    /** Remote node's consistent id. */
+    protected String remoteConsistentId;
+
+    /** Connection id. */
+    protected short connectionId;

Review Comment:
   This field is not used in this class, I would suggest to encapsulate it in the child classes. Same applies to `recoveryDescriptor`



##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java:
##########
@@ -28,29 +27,23 @@
  */
 public interface HandshakeManager {
     /**
-     * Initialize handshake manager with the channel.
+     * Initializes handshake manager with the channel.

Review Comment:
   ```suggestion
        * Initializes the handshake manager.
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java:
##########
@@ -87,34 +83,75 @@ public HandshakeResult onConnectionOpen(Channel channel) {
                 );
             }
         });
-
-        return HandshakeResult.noOp();
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+    public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartResponseMessage) {
             HandshakeStartResponseMessage msg = (HandshakeStartResponseMessage) message;
 
-            UUID remoteLaunchId = msg.launchId();
-            String remoteConsistentId = msg.consistentId();
+            this.remoteLaunchId = msg.launchId();
+            this.remoteConsistentId = msg.consistentId();
+            this.receivedCount = msg.receivedCount();
+            this.connectionId = msg.connectionId();
+
+            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId,
+                    connectionId, true);
 
-            handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+            handshake(recoveryDescriptor);
 
-            return HandshakeResult.removeHandler(remoteLaunchId, remoteConsistentId);
+            return;
         }
 
-        handshakeCompleteFuture.completeExceptionally(
-                new HandshakeException("Unexpected message during handshake: " + message.toString())
-        );
+        assert recoveryDescriptor != null : "Wrong server handshake flow";
+
+        if (recoveryDescriptor.unacknowledgedCount() == 0) {
+            finishHandshake();
+        }
 
-        return HandshakeResult.fail();
+        ctx.fireChannelRead(message);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<NettySender> handshakeFuture() {
-        return handshakeCompleteFuture;
+    private void handshake(RecoveryDescriptor descriptor) {
+        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+
+        HandshakeFinishMessage response = messageFactory.handshakeFinishMessage()
+                .receivedCount(descriptor.receivedCount())
+                .build();
+
+        CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
+                ctx.channel().writeAndFlush(new OutNetworkObject(response, Collections.emptyList(), false))
+        );
+
+        descriptor.acknowledge(receivedCount);
+
+        int unacknowledgedCount = (int) descriptor.unacknowledgedCount();
+
+        if (unacknowledgedCount > 0) {
+            var futs = new CompletableFuture[unacknowledgedCount + 1];
+            futs[0] = sendFuture;
+
+            List<OutNetworkObject> networkMessages = descriptor.unacknowledgedMessages();
+
+            for (int i = 0; i < networkMessages.size(); i++) {
+                OutNetworkObject networkMessage = networkMessages.get(i);
+                futs[i + 1] = NettyUtils.toCompletableFuture(ctx.channel().writeAndFlush(networkMessage));

Review Comment:
   Should we use `write` here and only `flush` at the end?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {
             if (throwable == null) {
-                channels.put(sender.consistentId(), sender);

Review Comment:
   Why did you remove this line?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {

Review Comment:
   `nodeInfo`?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -284,6 +289,14 @@ public boolean isStopped() {
         return stopped.get();
     }
 
+    protected HandshakeManager createClientHandshakeManager(short connectionId) {

Review Comment:
   ```suggestion
       private HandshakeManager createClientHandshakeManager(short connectionId) {
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java:
##########
@@ -87,34 +83,75 @@ public HandshakeResult onConnectionOpen(Channel channel) {
                 );
             }
         });
-
-        return HandshakeResult.noOp();
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+    public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartResponseMessage) {
             HandshakeStartResponseMessage msg = (HandshakeStartResponseMessage) message;
 
-            UUID remoteLaunchId = msg.launchId();
-            String remoteConsistentId = msg.consistentId();
+            this.remoteLaunchId = msg.launchId();
+            this.remoteConsistentId = msg.consistentId();
+            this.receivedCount = msg.receivedCount();
+            this.connectionId = msg.connectionId();
+
+            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId,
+                    connectionId, true);
 
-            handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+            handshake(recoveryDescriptor);
 
-            return HandshakeResult.removeHandler(remoteLaunchId, remoteConsistentId);
+            return;
         }
 
-        handshakeCompleteFuture.completeExceptionally(
-                new HandshakeException("Unexpected message during handshake: " + message.toString())
-        );
+        assert recoveryDescriptor != null : "Wrong server handshake flow";
+
+        if (recoveryDescriptor.unacknowledgedCount() == 0) {
+            finishHandshake();
+        }
 
-        return HandshakeResult.fail();
+        ctx.fireChannelRead(message);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<NettySender> handshakeFuture() {
-        return handshakeCompleteFuture;
+    private void handshake(RecoveryDescriptor descriptor) {
+        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+
+        HandshakeFinishMessage response = messageFactory.handshakeFinishMessage()
+                .receivedCount(descriptor.receivedCount())
+                .build();
+
+        CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
+                ctx.channel().writeAndFlush(new OutNetworkObject(response, Collections.emptyList(), false))
+        );
+
+        descriptor.acknowledge(receivedCount);
+
+        int unacknowledgedCount = (int) descriptor.unacknowledgedCount();

Review Comment:
   why did you make `unacknowledgedCount` a `long` if you cast it to `int`?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/BaseRecoveryHandshakeManager.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Base recovery handshake manager.
+ */
+public abstract class BaseRecoveryHandshakeManager implements HandshakeManager {

Review Comment:
   I actually find this class very confusing and the inheritors logic is very difficult to understand. Is it possible to get rid of it even at the cost of some copy-paste?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Recovery protocol descriptor.
+ */
+public class RecoveryDescriptor {
+    /** Unacknowledged messages. */
+    private final ArrayDeque<OutNetworkObject> unacknowledgedMessages;

Review Comment:
   ```suggestion
       private final Queue<OutNetworkObject> unacknowledgedMessages;
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -183,7 +188,7 @@ public CompletableFuture<NettySender> channel(@Nullable String consistentId, Soc
         // or didn't perform the handhsake operaton, can be reused.

Review Comment:
   ```suggestion
           // or didn't perform the handshake operation, can be reused.
   ```



##########
modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Recovery protocol handshake flow test.
+ */
+public class RecoveryHandshakeTest {
+    /** Connection id. */
+    private static final short CONNECTION_ID = 1337;
+
+    /** Serialization registry. */
+    private static final MessageSerializationRegistry MESSAGE_REGISTRY = new TestMessageSerializationRegistryImpl();
+
+    /** Message factory. */
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
+
+    /** Test message factory. */
+    private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new TestMessagesFactory();
+
+    @Test
+    public void testHandshake() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+        assertNull(serverSideChannel.readOutbound());
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedServerMessage() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        UUID clientLaunchId = UUID.randomUUID();
+        RecoveryDescriptor serverRecoveryDescriptor = serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID, true);
+        addUnacknowledgedMessages(serverRecoveryDescriptor);
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager("client", clientLaunchId, clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        assertNull(serverSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeNotCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedClientMessage() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        UUID serverLaunchId = UUID.randomUUID();
+        RecoveryDescriptor clientRecoveryDescriptor = clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID, false);
+        addUnacknowledgedMessages(clientRecoveryDescriptor);
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager("server", serverLaunchId, serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(serverSideChannel.readOutbound());
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        assertNull(clientSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeNotCompleted(clientHandshakeManager);
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testPairedRecoveryDescriptors() throws Exception {
+        RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider();
+
+        var node1Uuid = UUID.randomUUID();
+        var node2Uuid = UUID.randomUUID();
+
+        var chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
+        var shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+
+        var chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
+        var shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+
+        EmbeddedChannel out1to2 = setupChannel(chm1, noMessageListener);
+        EmbeddedChannel in1to2 = setupChannel(shm1, noMessageListener);
+        EmbeddedChannel out2to1 = setupChannel(chm2, noMessageListener);
+        EmbeddedChannel in2to1 = setupChannel(shm2, noMessageListener);
+
+        exchangeServerToClient(in1to2, out2to1);
+        exchangeServerToClient(in2to1, out1to2);
+
+        exchangeClientToServer(in1to2, out2to1);
+        exchangeClientToServer(in2to1, out1to2);
+
+        assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
+        assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+    }
+
+    @Test
+    public void testExactlyOnceServer() throws Exception {
+        testExactlyOnce(true);
+    }
+
+    @Test
+    public void testExactlyOnceClient() throws Exception {
+        testExactlyOnce(false);
+    }
+
+    /**
+     * Tests that message was received exactly once in case if network failure during acknowledgement.
+     *
+     * @param serverDidntReceiveAck {@code true} if server didn't receive the acknowledgement, {@code false} if client didn't receive
+     *                              the acknowledgement.
+     * @throws Exception If failed.
+     */
+    private void testExactlyOnce(boolean serverDidntReceiveAck) throws Exception {
+        var server = "server";
+        var serverLaunchId = UUID.randomUUID();
+        var client = "client";
+        var clientLaunchId = UUID.randomUUID();
+
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+
+        var receivedFirst = new AtomicBoolean();
+
+        var listener1 = new MessageListener("1", receivedFirst);
+
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener1 : noMessageListener);
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ?  noMessageListener : listener1);
+
+        // Normal handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        var ch = serverDidntReceiveAck ? serverSideChannel : clientSideChannel;
+
+        // Add two messages to the outbound
+        ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("1").build(), Collections.emptyList()));
+        ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("2").build(), Collections.emptyList()));
+
+        // Send one of the messages
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Message should be received
+        assertTrue(receivedFirst.get());
+
+        // Transfer only one acknowledgement, don't transfer the second one (simulates network failure on acknowledgement)
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        // Simulate reconnection
+        clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
+        serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+
+        var receivedSecond = new AtomicBoolean();
+
+        var listener2 = new MessageListener("2", receivedSecond);
+
+        clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
+        serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
+
+        // Handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        // Resending message
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Send another acknowledgement
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        assertNull(serverSideChannel.readOutbound());
+        assertNull(clientSideChannel.readOutbound());
+
+        assertTrue(receivedSecond.get());
+    }
+
+    /** Message listener that accepts a specific message only once. */
+    private static class MessageListener implements Consumer<InNetworkObject> {
+        /** Expected message. */
+        private final String expectedMessage;
+
+        /** Flag indicating that expected messages was received. */
+        private final AtomicBoolean flag;
+
+        private MessageListener(String expectedMessage, AtomicBoolean flag) {
+            this.expectedMessage = expectedMessage;
+            this.flag = flag;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void accept(InNetworkObject inNetworkObject) {
+            var msg = (TestMessage) inNetworkObject.message();
+            if (expectedMessage.equals(msg.msg())) {
+                if (!flag.compareAndSet(false, true)) {
+                    fail();
+                }
+                return;
+            }
+            fail();
+        }
+    }
+
+    private void checkPipelineAfterHandshake(EmbeddedChannel channel) {
+        assertNull(channel.pipeline().get(HandshakeHandler.NAME));
+    }
+
+    private void checkHandshakeNotCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+        assertFalse(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void checkHandshakeCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+        assertTrue(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void addUnacknowledgedMessages(RecoveryDescriptor recoveryDescriptor) {
+        TestMessage msg = TEST_MESSAGES_FACTORY.testMessage().msg("test").build();
+        recoveryDescriptor.add(new OutNetworkObject(msg, Collections.emptyList()));
+    }
+
+    private void exchangeServerToClient(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = serverSideChannel.readOutbound();
+        clientSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private void exchangeClientToServer(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = clientSideChannel.readOutbound();
+        serverSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private final Consumer<InNetworkObject> noMessageListener = inNetworkObject ->
+            fail("Received message while shouldn't have, [" + inNetworkObject.message() + "]");
+
+    private EmbeddedChannel setupChannel(HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) throws Exception {
+        // Channel should not be registered at first, not before we add pipeline handlers
+        // Otherwise, events like "channel active" won't be propagated to the handlers
+        var channel = new EmbeddedChannel(false, false);
+
+        var serializationService = new SerializationService(MESSAGE_REGISTRY, createUserObjectSerializationContext());
+        var sessionSerializationService = new PerSessionSerializationService(serializationService);
+
+        PipelineUtils.setup(channel.pipeline(), sessionSerializationService, handshakeManager, messageListener);
+
+        channel.register();
+
+        return channel;
+    }
+
+    private UserObjectSerializationContext createUserObjectSerializationContext() {
+        var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+        var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+        var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+
+        return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+                userObjectMarshaller);
+    }
+
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryClientHandshakeManager("client", UUID.randomUUID(), provider);
+    }
+
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryServerHandshakeManager("server", UUID.randomUUID(), provider);
+    }
+
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryServerHandshakeManager(launchId, consistentId, MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {

Review Comment:
   What's the point of this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org