You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/05/10 19:05:53 UTC

[GitHub] [ignite-3] SammyVimes opened a new pull request, #804: IGNITE-14085 Introduce network recovery protocol

SammyVimes opened a new pull request, #804:
URL: https://github.com/apache/ignite-3/pull/804

   https://issues.apache.org/jira/browse/IGNITE-14085


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r872326913


##########
modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Recovery protocol handshake flow test.
+ */
+public class RecoveryHandshakeTest {
+    /** Connection id. */
+    private static final short CONNECTION_ID = 1337;
+
+    /** Serialization registry. */
+    private static final MessageSerializationRegistry MESSAGE_REGISTRY = new TestMessageSerializationRegistryImpl();
+
+    /** Message factory. */
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
+
+    /** Test message factory. */
+    private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new TestMessagesFactory();
+
+    @Test
+    public void testHandshake() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+        assertNull(serverSideChannel.readOutbound());
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedServerMessage() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        UUID clientLaunchId = UUID.randomUUID();
+        RecoveryDescriptor serverRecoveryDescriptor = serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID, true);
+        addUnacknowledgedMessages(serverRecoveryDescriptor);
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager("client", clientLaunchId, clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        assertNull(serverSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeNotCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedClientMessage() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        UUID serverLaunchId = UUID.randomUUID();
+        RecoveryDescriptor clientRecoveryDescriptor = clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID, false);
+        addUnacknowledgedMessages(clientRecoveryDescriptor);
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager("server", serverLaunchId, serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(serverSideChannel.readOutbound());
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        assertNull(clientSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeNotCompleted(clientHandshakeManager);
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testPairedRecoveryDescriptors() throws Exception {
+        RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider();
+
+        var node1Uuid = UUID.randomUUID();
+        var node2Uuid = UUID.randomUUID();
+
+        var chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
+        var shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+
+        var chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
+        var shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+
+        EmbeddedChannel out1to2 = setupChannel(chm1, noMessageListener);
+        EmbeddedChannel in1to2 = setupChannel(shm1, noMessageListener);
+        EmbeddedChannel out2to1 = setupChannel(chm2, noMessageListener);
+        EmbeddedChannel in2to1 = setupChannel(shm2, noMessageListener);
+
+        exchangeServerToClient(in1to2, out2to1);
+        exchangeServerToClient(in2to1, out1to2);
+
+        exchangeClientToServer(in1to2, out2to1);
+        exchangeClientToServer(in2to1, out1to2);
+
+        assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
+        assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+    }
+
+    @Test
+    public void testExactlyOnceServer() throws Exception {
+        testExactlyOnce(true);
+    }
+
+    @Test
+    public void testExactlyOnceClient() throws Exception {
+        testExactlyOnce(false);
+    }
+
+    /**
+     * Tests that message was received exactly once in case if network failure during acknowledgement.
+     *
+     * @param serverDidntReceiveAck {@code true} if server didn't receive the acknowledgement, {@code false} if client didn't receive
+     *                              the acknowledgement.
+     * @throws Exception If failed.
+     */
+    private void testExactlyOnce(boolean serverDidntReceiveAck) throws Exception {
+        var server = "server";
+        var serverLaunchId = UUID.randomUUID();
+        var client = "client";
+        var clientLaunchId = UUID.randomUUID();
+
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+
+        var receivedFirst = new AtomicBoolean();
+
+        var listener1 = new MessageListener("1", receivedFirst);
+
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener1 : noMessageListener);
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ?  noMessageListener : listener1);
+
+        // Normal handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        var ch = serverDidntReceiveAck ? serverSideChannel : clientSideChannel;
+
+        // Add two messages to the outbound
+        ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("1").build(), Collections.emptyList()));
+        ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("2").build(), Collections.emptyList()));
+
+        // Send one of the messages
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Message should be received
+        assertTrue(receivedFirst.get());
+
+        // Transfer only one acknowledgement, don't transfer the second one (simulates network failure on acknowledgement)
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        // Simulate reconnection
+        clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
+        serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+
+        var receivedSecond = new AtomicBoolean();
+
+        var listener2 = new MessageListener("2", receivedSecond);
+
+        clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
+        serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
+
+        // Handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        // Resending message
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Send another acknowledgement
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        assertNull(serverSideChannel.readOutbound());
+        assertNull(clientSideChannel.readOutbound());
+
+        assertTrue(receivedSecond.get());
+    }
+
+    /** Message listener that accepts a specific message only once. */
+    private static class MessageListener implements Consumer<InNetworkObject> {
+        /** Expected message. */
+        private final String expectedMessage;
+
+        /** Flag indicating that expected messages was received. */
+        private final AtomicBoolean flag;
+
+        private MessageListener(String expectedMessage, AtomicBoolean flag) {
+            this.expectedMessage = expectedMessage;
+            this.flag = flag;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void accept(InNetworkObject inNetworkObject) {
+            var msg = (TestMessage) inNetworkObject.message();
+            if (expectedMessage.equals(msg.msg())) {
+                if (!flag.compareAndSet(false, true)) {
+                    fail();
+                }
+                return;
+            }
+            fail();
+        }
+    }
+
+    private void checkPipelineAfterHandshake(EmbeddedChannel channel) {
+        assertNull(channel.pipeline().get(HandshakeHandler.NAME));
+    }
+
+    private void checkHandshakeNotCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+        assertFalse(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void checkHandshakeCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+        assertTrue(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void addUnacknowledgedMessages(RecoveryDescriptor recoveryDescriptor) {
+        TestMessage msg = TEST_MESSAGES_FACTORY.testMessage().msg("test").build();
+        recoveryDescriptor.add(new OutNetworkObject(msg, Collections.emptyList()));
+    }
+
+    private void exchangeServerToClient(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = serverSideChannel.readOutbound();
+        clientSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private void exchangeClientToServer(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = clientSideChannel.readOutbound();
+        serverSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private final Consumer<InNetworkObject> noMessageListener = inNetworkObject ->
+            fail("Received message while shouldn't have, [" + inNetworkObject.message() + "]");
+
+    private EmbeddedChannel setupChannel(HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) throws Exception {
+        // Channel should not be registered at first, not before we add pipeline handlers
+        // Otherwise, events like "channel active" won't be propagated to the handlers
+        var channel = new EmbeddedChannel(false, false);
+
+        var serializationService = new SerializationService(MESSAGE_REGISTRY, createUserObjectSerializationContext());
+        var sessionSerializationService = new PerSessionSerializationService(serializationService);
+
+        PipelineUtils.setup(channel.pipeline(), sessionSerializationService, handshakeManager, messageListener);
+
+        channel.register();
+
+        return channel;
+    }
+
+    private UserObjectSerializationContext createUserObjectSerializationContext() {
+        var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+        var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+        var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+
+        return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+                userObjectMarshaller);
+    }
+
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryClientHandshakeManager("client", UUID.randomUUID(), provider);
+    }
+
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryServerHandshakeManager("server", UUID.randomUUID(), provider);
+    }
+
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryServerHandshakeManager(launchId, consistentId, MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {

Review Comment:
   It will be used later when there will be another descriptor provider 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871171542


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "inbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /** Messages factory. */
+    private final NetworkMessagesFactory factory;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     * @param factory Message factory.
+     */
+    public InboundRecoveryHandler(RecoveryDescriptor descriptor, NetworkMessagesFactory factory) {
+        this.descriptor = descriptor;
+        this.factory = factory;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        NetworkMessage message = (NetworkMessage) msg;
+
+        if (message instanceof AcknowledgementMessage) {
+            AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+            long receivedMessages = ackMessage.receivedMessages();
+
+            descriptor.acknowledge(receivedMessages);
+        } else if (message.needAck()) {
+            AcknowledgementMessage ackMsg = factory.acknowledgementMessage()

Review Comment:
   Updated the ticket



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871108192


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "inbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /** Messages factory. */
+    private final NetworkMessagesFactory factory;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     * @param factory Message factory.
+     */
+    public InboundRecoveryHandler(RecoveryDescriptor descriptor, NetworkMessagesFactory factory) {
+        this.descriptor = descriptor;
+        this.factory = factory;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        NetworkMessage message = (NetworkMessage) msg;
+
+        if (message instanceof AcknowledgementMessage) {
+            AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+            long receivedMessages = ackMessage.receivedMessages();
+
+            descriptor.acknowledge(receivedMessages);
+        } else if (message.needAck()) {
+            AcknowledgementMessage ackMsg = factory.acknowledgementMessage()

Review Comment:
   This is a quick&dirty approach, the good one will be addressed in this ticket: https://issues.apache.org/jira/browse/IGNITE-16954



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r872318472


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {
             if (throwable == null) {
-                channels.put(sender.consistentId(), sender);

Review Comment:
   Whoops, some leftovers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871125999


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "inbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /** Messages factory. */
+    private final NetworkMessagesFactory factory;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     * @param factory Message factory.
+     */
+    public InboundRecoveryHandler(RecoveryDescriptor descriptor, NetworkMessagesFactory factory) {
+        this.descriptor = descriptor;
+        this.factory = factory;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        NetworkMessage message = (NetworkMessage) msg;
+
+        if (message instanceof AcknowledgementMessage) {
+            AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+            long receivedMessages = ackMessage.receivedMessages();
+
+            descriptor.acknowledge(receivedMessages);
+        } else if (message.needAck()) {
+            AcknowledgementMessage ackMsg = factory.acknowledgementMessage()

Review Comment:
   Ticket is poorly described. Can you elaborate on what you mean by all of it?
   Also, I don't see mentioning of batched acknowledgements there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on PR #804:
URL: https://github.com/apache/ignite-3/pull/804#issuecomment-1126191685

   Merged to the main branch, thanks for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r872308832


##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java:
##########
@@ -28,29 +27,23 @@
  */
 public interface HandshakeManager {
     /**
-     * Initialize handshake manager with the channel.
+     * Initializes handshake manager with the channel.
      *
-     * @param channel Channel.
-     * @return Action to perform by {@link HandshakeHandler}.
+     * @param handlerContext Channel handler context.
      */
-    HandshakeResult init(Channel channel);
+    void onInit(ChannelHandlerContext handlerContext);

Review Comment:
   To make naming consistent (see other methods). It's event handling and I prefer "on" + EventName



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871115838


##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Recovery protocol descriptor.
+ */
+public class RecoveryDescriptor {
+    /** Unacknowledged messages. */
+    private final ArrayDeque<OutNetworkObject> unacknowledgedMessages;
+
+    /** Count of sent messages. */
+    private long sentCount;
+
+    /** Count of acknowledged sent messages. */
+    private long acknowledgedCount;
+
+    /** Count of received messages. */
+    private long receivedCount;
+
+    /**
+     * Constructor.
+     *
+     * @param queueLimit Maximum size of unacknowledged messages queue.
+     */
+    public RecoveryDescriptor(int queueLimit) {
+        this.unacknowledgedMessages = new ArrayDeque<>(queueLimit);
+    }
+
+    /**
+     * Returns count of received messages.
+     *
+     * @return Count of received messages.
+     */
+    public long receivedCount() {
+        return receivedCount;
+    }
+
+    /**
+     * Acknowledges that sent messages were received by the remote node.
+     *
+     * @param messagesReceivedByRemote Number of all messages received by the remote node.
+     */
+    public void acknowledge(long messagesReceivedByRemote) {
+        while (acknowledgedCount < messagesReceivedByRemote) {
+            OutNetworkObject req = unacknowledgedMessages.pollFirst();
+
+            assert req != null;
+
+            acknowledgedCount++;
+        }
+    }
+
+    /**
+     * Returns the number of the messages unacknowledged by the remote node.
+     *
+     * @return The number of the messages unacknowledged by the remote node.
+     */
+    public long unacknowledgedCount() {
+        long res = sentCount - acknowledgedCount;
+
+        assert res >= 0;
+        assert res == unacknowledgedMessages.size();
+
+        return res;
+    }
+
+    /**
+     * Returns unacknowledged messages.
+     *
+     * @return Unacknowledged messages.
+     */
+    public List<OutNetworkObject> unacknowledgedMessages() {
+        return new ArrayList<>(unacknowledgedMessages);
+    }
+
+    /**
+     * Adds a sent message.
+     *
+     * @param msg Message.
+     */
+    public void add(OutNetworkObject msg) {
+        msg.shouldBeSavedForRecovery(false);
+        sentCount++;
+        unacknowledgedMessages.addLast(msg);

Review Comment:
   Will be addressed here: https://issues.apache.org/jira/browse/IGNITE-16954. Basically we will close the connection in the event of queue "overflow"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871111753


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.network.OutNetworkObject;
+
+/** Outbound handler that adds outgoing message to the recovery descriptor. */
+public class OutboundRecoveryHandler extends ChannelOutboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "outbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     */
+    public OutboundRecoveryHandler(RecoveryDescriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+        OutNetworkObject outNetworkObject = (OutNetworkObject) msg;
+
+        if (outNetworkObject.shouldBeSavedForRecovery()) {
+            descriptor.add(outNetworkObject);

Review Comment:
   Good point, I believe I tried nullifying this fields and there was some issues with that but I can't recall what exactly it was. Will look into that later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871167437


##########
modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java:
##########
@@ -24,7 +24,7 @@
  */
 public interface NetworkMessage {
     /** Size of the message type (in bytes), used during (de-)serialization. */
-    static final int MSG_TYPE_SIZE_BYTES = 4;
+    int MSG_TYPE_SIZE_BYTES = 4;

Review Comment:
   No, just two shorts, no varlen yet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871116898


##########
modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java:
##########
@@ -24,7 +24,7 @@
  */
 public interface NetworkMessage {
     /** Size of the message type (in bytes), used during (de-)serialization. */
-    static final int MSG_TYPE_SIZE_BYTES = 4;
+    int MSG_TYPE_SIZE_BYTES = 4;

Review Comment:
   At this moment no, there is no varlen compression in direct marshaller



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871081859


##########
modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java:
##########
@@ -24,7 +24,7 @@
  */
 public interface NetworkMessage {
     /** Size of the message type (in bytes), used during (de-)serialization. */
-    static final int MSG_TYPE_SIZE_BYTES = 4;
+    int MSG_TYPE_SIZE_BYTES = 4;

Review Comment:
   Did we implement a varlen type compression? Absolute majority of messages only require 2 bytes



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -72,43 +76,41 @@ public class ConnectionManager {
     /** Message listeners. */
     private final List<Consumer<InNetworkObject>> listeners = new CopyOnWriteArrayList<>();
 
+    private final UUID launchId;

Review Comment:
   Please add a small javadoc



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements RecoveryDescriptorProvider {
+    /** Recovery descriptors. */
+    private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID launchId, short connectionIndex, boolean inbound) {
+        var key = new ChannelKey(consistentId, launchId, connectionIndex, inbound);
+
+        return recoveryDescriptors.computeIfAbsent(key, channelKey -> new RecoveryDescriptor(10));

Review Comment:
   What is a "10"?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "inbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /** Messages factory. */
+    private final NetworkMessagesFactory factory;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     * @param factory Message factory.
+     */
+    public InboundRecoveryHandler(RecoveryDescriptor descriptor, NetworkMessagesFactory factory) {
+        this.descriptor = descriptor;
+        this.factory = factory;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        NetworkMessage message = (NetworkMessage) msg;
+
+        if (message instanceof AcknowledgementMessage) {
+            AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+            long receivedMessages = ackMessage.receivedMessages();
+
+            descriptor.acknowledge(receivedMessages);
+        } else if (message.needAck()) {
+            AcknowledgementMessage ackMsg = factory.acknowledgementMessage()

Review Comment:
   So, you're saying that every message that needs an ack will receive it individually, right?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.network.OutNetworkObject;
+
+/** Outbound handler that adds outgoing message to the recovery descriptor. */
+public class OutboundRecoveryHandler extends ChannelOutboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "outbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     */
+    public OutboundRecoveryHandler(RecoveryDescriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+        OutNetworkObject outNetworkObject = (OutNetworkObject) msg;
+
+        if (outNetworkObject.shouldBeSavedForRecovery()) {
+            descriptor.add(outNetworkObject);

Review Comment:
   Another question - do we nullify serialized UOS fields? If we have already byte[] representations, then we don't need original objects. And, considering that we cache a bunch of messages, they **will** take a lot of space.



##########
modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java:
##########
@@ -57,6 +57,10 @@ public class NetworkMessageTypes {
      */
     public static final short HANDSHAKE_START_RESPONSE = 4;
 
+
+    public static final short HANDSHAKE_FINISH = 8;

Review Comment:
   Formatting is messed up. Why do we have a gap in these numbers BTW?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements RecoveryDescriptorProvider {
+    /** Recovery descriptors. */
+    private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID launchId, short connectionIndex, boolean inbound) {
+        var key = new ChannelKey(consistentId, launchId, connectionIndex, inbound);
+
+        return recoveryDescriptors.computeIfAbsent(key, channelKey -> new RecoveryDescriptor(10));
+    }
+
+    /** Channel key. */
+    private static class ChannelKey {
+        /** Remote node's consistent id. */
+        private final String consistentId;
+
+        /** Remote node's launch id. */
+        private final UUID launchId;
+
+        /**
+         * Connection id. Every connection between this node and a remote node has a unique connection id,
+         * but connections with different nodes may have same ids.
+         */
+        private final short connectionId;
+
+        /** {@code true} if channel is inbound, {@code false} otherwise. */
+        private final boolean inbound;
+
+        private ChannelKey(String consistentId, UUID launchId, short connectionId, boolean inbound) {
+            this.consistentId = consistentId;
+            this.launchId = launchId;
+            this.connectionId = connectionId;
+            this.inbound = inbound;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ChannelKey that = (ChannelKey) o;
+            return connectionId == that.connectionId && inbound == that.inbound && consistentId.equals(that.consistentId)
+                    && launchId.equals(
+                    that.launchId);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int hashCode() {
+            return Objects.hash(consistentId, launchId, connectionId, inbound);

Review Comment:
   Does JIT inline this method? I'm curious



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Recovery protocol descriptor.
+ */
+public class RecoveryDescriptor {
+    /** Unacknowledged messages. */
+    private final ArrayDeque<OutNetworkObject> unacknowledgedMessages;
+
+    /** Count of sent messages. */
+    private long sentCount;
+
+    /** Count of acknowledged sent messages. */
+    private long acknowledgedCount;
+
+    /** Count of received messages. */
+    private long receivedCount;
+
+    /**
+     * Constructor.
+     *
+     * @param queueLimit Maximum size of unacknowledged messages queue.
+     */
+    public RecoveryDescriptor(int queueLimit) {
+        this.unacknowledgedMessages = new ArrayDeque<>(queueLimit);
+    }
+
+    /**
+     * Returns count of received messages.
+     *
+     * @return Count of received messages.
+     */
+    public long receivedCount() {
+        return receivedCount;
+    }
+
+    /**
+     * Acknowledges that sent messages were received by the remote node.
+     *
+     * @param messagesReceivedByRemote Number of all messages received by the remote node.
+     */
+    public void acknowledge(long messagesReceivedByRemote) {
+        while (acknowledgedCount < messagesReceivedByRemote) {
+            OutNetworkObject req = unacknowledgedMessages.pollFirst();
+
+            assert req != null;
+
+            acknowledgedCount++;
+        }
+    }
+
+    /**
+     * Returns the number of the messages unacknowledged by the remote node.
+     *
+     * @return The number of the messages unacknowledged by the remote node.
+     */
+    public long unacknowledgedCount() {
+        long res = sentCount - acknowledgedCount;
+
+        assert res >= 0;
+        assert res == unacknowledgedMessages.size();
+
+        return res;
+    }
+
+    /**
+     * Returns unacknowledged messages.
+     *
+     * @return Unacknowledged messages.
+     */
+    public List<OutNetworkObject> unacknowledgedMessages() {
+        return new ArrayList<>(unacknowledgedMessages);
+    }
+
+    /**
+     * Adds a sent message.
+     *
+     * @param msg Message.
+     */
+    public void add(OutNetworkObject msg) {
+        msg.shouldBeSavedForRecovery(false);
+        sentCount++;
+        unacknowledgedMessages.addLast(msg);

Review Comment:
   Queue has a size limit. What happens if you can't fit a new message?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -72,43 +76,41 @@ public class ConnectionManager {
     /** Message listeners. */
     private final List<Consumer<InNetworkObject>> listeners = new CopyOnWriteArrayList<>();
 
+    private final UUID launchId;
+
     /** Node consistent id. */
     private final String consistentId;
 
-    /** Client handshake manager factory. */
-    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
-
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
 
     /** Stop flag. */
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
+    private final RecoveryDescriptorProvider descriptorProvider = new DefaultRecoveryDescriptorProvider();
+
     /**
      * Constructor.
      *
      * @param networkConfiguration          Network configuration.
      * @param serializationService          Serialization service.
      * @param consistentId                  Consistent id of this node.
-     * @param serverHandshakeManagerFactory Server handshake manager factory.
-     * @param clientHandshakeManagerFactory Client handshake manager factory.
      * @param bootstrapFactory              Bootstrap factory.
      */
     public ConnectionManager(
             NetworkView networkConfiguration,
             SerializationService serializationService,
+            UUID launchId,

Review Comment:
   New parameter is not documented



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+
+/** Pipeline utils. */
+public class PipelineUtils {
+    /** {@link ChunkedWriteHandler}'s name. */
+    private static final String CHUNKED_WRITE_HANDLER_NAME = "chunked-write-handler";
+
+    /**
+     * Sets up initial pipeline.
+     *
+     * @param pipeline Channel pipeline.
+     * @param serializationService Serialization service.
+     * @param handshakeManager Handshake manager.
+     * @param messageListener Message listener.
+     */
+    public static void setup(ChannelPipeline pipeline, PerSessionSerializationService serializationService,
+                HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) {
+        pipeline.addLast(InboundDecoder.NAME, new InboundDecoder(serializationService));
+        pipeline.addLast(HandshakeHandler.NAME, new HandshakeHandler(handshakeManager, messageListener, serializationService));
+        pipeline.addLast(CHUNKED_WRITE_HANDLER_NAME, new ChunkedWriteHandler());
+        pipeline.addLast(OutboundEncoder.NAME, new OutboundEncoder(serializationService));
+        pipeline.addLast(IoExceptionSuppressingHandler.NAME, new IoExceptionSuppressingHandler());
+    }
+
+    /**
+     * Changes pipeline after the handshake.
+     *
+     * @param pipeline Pipeline.
+     * @param descriptor Recovery descriptor.
+     * @param messageHandler Message handler.
+     * @param factory Message factory.
+     */
+    public static void afterHandshake(
+            ChannelPipeline pipeline,
+            RecoveryDescriptor descriptor,
+            MessageHandler messageHandler,
+            NetworkMessagesFactory factory
+    ) {
+        pipeline.addAfter(OutboundEncoder.NAME, OutboundRecoveryHandler.NAME, new OutboundRecoveryHandler(descriptor));
+        pipeline.addBefore(HandshakeHandler.NAME, InboundRecoveryHandler.NAME, new InboundRecoveryHandler(descriptor, factory));
+        pipeline.addAfter(HandshakeHandler.NAME, MessageHandler.NAME, messageHandler);
+    }
+

Review Comment:
   Please remove these empty lines :)



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "inbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /** Messages factory. */
+    private final NetworkMessagesFactory factory;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     * @param factory Message factory.
+     */
+    public InboundRecoveryHandler(RecoveryDescriptor descriptor, NetworkMessagesFactory factory) {
+        this.descriptor = descriptor;
+        this.factory = factory;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        NetworkMessage message = (NetworkMessage) msg;
+
+        if (message instanceof AcknowledgementMessage) {
+            AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+            long receivedMessages = ackMessage.receivedMessages();
+
+            descriptor.acknowledge(receivedMessages);
+        } else if (message.needAck()) {
+            AcknowledgementMessage ackMsg = factory.acknowledgementMessage()

Review Comment:
   I believe we should revisit it, maybe not here. Node should passively send its counter every N messages or when idling. I would be cool to compare current approach with a "new" one, by measuring throughput and latency percentiles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r872316416


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {
             if (throwable == null) {
-                channels.put(sender.consistentId(), sender);
+                // No-op

Review Comment:
   Sometimes I surprise even myself with such bad code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r872309984


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {

Review Comment:
   Yes, I will add a TODO here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes closed pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes closed pull request #804: IGNITE-14085 Introduce network recovery protocol
URL: https://github.com/apache/ignite-3/pull/804


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
sashapolo commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871562173


##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java:
##########
@@ -28,29 +27,23 @@
  */
 public interface HandshakeManager {
     /**
-     * Initialize handshake manager with the channel.
+     * Initializes handshake manager with the channel.
      *
-     * @param channel Channel.
-     * @return Action to perform by {@link HandshakeHandler}.
+     * @param handlerContext Channel handler context.
      */
-    HandshakeResult init(Channel channel);
+    void onInit(ChannelHandlerContext handlerContext);

Review Comment:
   why did you rename this method?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -75,40 +79,41 @@ public class ConnectionManager {
     /** Node consistent id. */
     private final String consistentId;
 
-    /** Client handshake manager factory. */
-    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
+    /** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
+    private final UUID launchId;
 
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
 
     /** Stop flag. */
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
+    /** Recovery descriptor provider. */
+    private final RecoveryDescriptorProvider descriptorProvider = new DefaultRecoveryDescriptorProvider();

Review Comment:
   I would suggest to rename `RecoveryDescriptorProvider` to `RecoveryDescriptorFactory`



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -284,6 +289,14 @@ public boolean isStopped() {
         return stopped.get();
     }
 
+    protected HandshakeManager createClientHandshakeManager(short connectionId) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, FACTORY, descriptorProvider);
+    }
+
+    protected HandshakeManager createServerHandshakeManager() {

Review Comment:
   ```suggestion
       private HandshakeManager createServerHandshakeManager() {
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {
             if (throwable == null) {
-                channels.put(sender.consistentId(), sender);
+                // No-op

Review Comment:
   I guess we can simply write:
   ```
   if (throwable != null) {
       clients.remove(address);
   }
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements RecoveryDescriptorProvider {
+    // TODO: IGNITE-16954 Make this configurable
+    private static final int DEFAULT_QUEUE_LIMIT = 10;
+
+    /** Recovery descriptors. */
+    private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID launchId, short connectionIndex, boolean inbound) {
+        var key = new ChannelKey(consistentId, launchId, connectionIndex, inbound);
+
+        return recoveryDescriptors.computeIfAbsent(key, channelKey -> new RecoveryDescriptor(DEFAULT_QUEUE_LIMIT));
+    }
+
+    /** Channel key. */
+    private static class ChannelKey {
+        /** Remote node's consistent id. */
+        private final String consistentId;
+
+        /** Remote node's launch id. */
+        private final UUID launchId;
+
+        /**
+         * Connection id. Every connection between this node and a remote node has a unique connection id,
+         * but connections with different nodes may have same ids.

Review Comment:
   ```suggestion
            * but connections with different nodes may have same ids.
   ```
   ```suggestion
            * but connections with different nodes may have the same ids.
   ```



##########
modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Recovery protocol handshake flow test.
+ */
+public class RecoveryHandshakeTest {
+    /** Connection id. */
+    private static final short CONNECTION_ID = 1337;
+
+    /** Serialization registry. */
+    private static final MessageSerializationRegistry MESSAGE_REGISTRY = new TestMessageSerializationRegistryImpl();
+
+    /** Message factory. */
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
+
+    /** Test message factory. */
+    private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new TestMessagesFactory();
+
+    @Test
+    public void testHandshake() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);

Review Comment:
   Please check `var` usages in this class, may of them are illegal



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -60,6 +63,7 @@ public class ConnectionManager {
     /** Server. */
     private final NettyServer server;
 
+    // TODO: IGNITE-16948 Should be a map consistentId -> connectionId -> sender

Review Comment:
   IGNITE-16948 does not describe why this should be a different map



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {

Review Comment:
   `connectionId` parameter is always 0, do I understand correctly that it will be changed under IGNITE-16948?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/BaseRecoveryHandshakeManager.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Base recovery handshake manager.
+ */
+public abstract class BaseRecoveryHandshakeManager implements HandshakeManager {
+    /** Handshake completion future. */
+    protected final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
+
+    /** Remote node's launch id. */
+    protected UUID remoteLaunchId;
+
+    /** Remote node's consistent id. */
+    protected String remoteConsistentId;
+
+    /** Connection id. */
+    protected short connectionId;
+
+    /** Netty pipeline channel handler context. */
+    protected ChannelHandlerContext ctx;
+
+    /** Channel. */
+    protected Channel channel;
+
+    /** Netty pipeline handshake handler. */
+    protected HandshakeHandler handler;
+
+    /** Recovery descriptor. */
+    protected RecoveryDescriptor recoveryDescriptor;
+
+    /** {@inheritDoc} */
+    @Override
+    public void onInit(ChannelHandlerContext handlerContext) {
+        this.ctx = handlerContext;
+        this.channel = handlerContext.channel();
+        this.handler = (HandshakeHandler) ctx.handler();
+    }
+
+    /**
+     * Creates a message handler using the consistent id of a remote node.
+     *
+     * @return New message handler.
+     */
+    public MessageHandler createMessageHandler() {
+        return handler.createMessageHandler(remoteConsistentId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NettySender> handshakeFuture() {
+        return handshakeCompleteFuture;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onConnectionOpen() {

Review Comment:
   Why do you need to override this method here?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/BaseRecoveryHandshakeManager.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Base recovery handshake manager.
+ */
+public abstract class BaseRecoveryHandshakeManager implements HandshakeManager {
+    /** Handshake completion future. */
+    protected final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
+
+    /** Remote node's launch id. */
+    protected UUID remoteLaunchId;
+
+    /** Remote node's consistent id. */
+    protected String remoteConsistentId;
+
+    /** Connection id. */
+    protected short connectionId;

Review Comment:
   This field is not used in this class, I would suggest to encapsulate it in the child classes. Same applies to `recoveryDescriptor`



##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java:
##########
@@ -28,29 +27,23 @@
  */
 public interface HandshakeManager {
     /**
-     * Initialize handshake manager with the channel.
+     * Initializes handshake manager with the channel.

Review Comment:
   ```suggestion
        * Initializes the handshake manager.
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java:
##########
@@ -87,34 +83,75 @@ public HandshakeResult onConnectionOpen(Channel channel) {
                 );
             }
         });
-
-        return HandshakeResult.noOp();
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+    public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartResponseMessage) {
             HandshakeStartResponseMessage msg = (HandshakeStartResponseMessage) message;
 
-            UUID remoteLaunchId = msg.launchId();
-            String remoteConsistentId = msg.consistentId();
+            this.remoteLaunchId = msg.launchId();
+            this.remoteConsistentId = msg.consistentId();
+            this.receivedCount = msg.receivedCount();
+            this.connectionId = msg.connectionId();
+
+            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId,
+                    connectionId, true);
 
-            handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+            handshake(recoveryDescriptor);
 
-            return HandshakeResult.removeHandler(remoteLaunchId, remoteConsistentId);
+            return;
         }
 
-        handshakeCompleteFuture.completeExceptionally(
-                new HandshakeException("Unexpected message during handshake: " + message.toString())
-        );
+        assert recoveryDescriptor != null : "Wrong server handshake flow";
+
+        if (recoveryDescriptor.unacknowledgedCount() == 0) {
+            finishHandshake();
+        }
 
-        return HandshakeResult.fail();
+        ctx.fireChannelRead(message);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<NettySender> handshakeFuture() {
-        return handshakeCompleteFuture;
+    private void handshake(RecoveryDescriptor descriptor) {
+        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+
+        HandshakeFinishMessage response = messageFactory.handshakeFinishMessage()
+                .receivedCount(descriptor.receivedCount())
+                .build();
+
+        CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
+                ctx.channel().writeAndFlush(new OutNetworkObject(response, Collections.emptyList(), false))
+        );
+
+        descriptor.acknowledge(receivedCount);
+
+        int unacknowledgedCount = (int) descriptor.unacknowledgedCount();
+
+        if (unacknowledgedCount > 0) {
+            var futs = new CompletableFuture[unacknowledgedCount + 1];
+            futs[0] = sendFuture;
+
+            List<OutNetworkObject> networkMessages = descriptor.unacknowledgedMessages();
+
+            for (int i = 0; i < networkMessages.size(); i++) {
+                OutNetworkObject networkMessage = networkMessages.get(i);
+                futs[i + 1] = NettyUtils.toCompletableFuture(ctx.channel().writeAndFlush(networkMessage));

Review Comment:
   Should we use `write` here and only `flush` at the end?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {
             if (throwable == null) {
-                channels.put(sender.consistentId(), sender);

Review Comment:
   Why did you remove this line?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {

Review Comment:
   `nodeInfo`?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -284,6 +289,14 @@ public boolean isStopped() {
         return stopped.get();
     }
 
+    protected HandshakeManager createClientHandshakeManager(short connectionId) {

Review Comment:
   ```suggestion
       private HandshakeManager createClientHandshakeManager(short connectionId) {
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java:
##########
@@ -87,34 +83,75 @@ public HandshakeResult onConnectionOpen(Channel channel) {
                 );
             }
         });
-
-        return HandshakeResult.noOp();
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+    public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartResponseMessage) {
             HandshakeStartResponseMessage msg = (HandshakeStartResponseMessage) message;
 
-            UUID remoteLaunchId = msg.launchId();
-            String remoteConsistentId = msg.consistentId();
+            this.remoteLaunchId = msg.launchId();
+            this.remoteConsistentId = msg.consistentId();
+            this.receivedCount = msg.receivedCount();
+            this.connectionId = msg.connectionId();
+
+            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId,
+                    connectionId, true);
 
-            handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+            handshake(recoveryDescriptor);
 
-            return HandshakeResult.removeHandler(remoteLaunchId, remoteConsistentId);
+            return;
         }
 
-        handshakeCompleteFuture.completeExceptionally(
-                new HandshakeException("Unexpected message during handshake: " + message.toString())
-        );
+        assert recoveryDescriptor != null : "Wrong server handshake flow";
+
+        if (recoveryDescriptor.unacknowledgedCount() == 0) {
+            finishHandshake();
+        }
 
-        return HandshakeResult.fail();
+        ctx.fireChannelRead(message);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<NettySender> handshakeFuture() {
-        return handshakeCompleteFuture;
+    private void handshake(RecoveryDescriptor descriptor) {
+        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+
+        HandshakeFinishMessage response = messageFactory.handshakeFinishMessage()
+                .receivedCount(descriptor.receivedCount())
+                .build();
+
+        CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
+                ctx.channel().writeAndFlush(new OutNetworkObject(response, Collections.emptyList(), false))
+        );
+
+        descriptor.acknowledge(receivedCount);
+
+        int unacknowledgedCount = (int) descriptor.unacknowledgedCount();

Review Comment:
   why did you make `unacknowledgedCount` a `long` if you cast it to `int`?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/BaseRecoveryHandshakeManager.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Base recovery handshake manager.
+ */
+public abstract class BaseRecoveryHandshakeManager implements HandshakeManager {

Review Comment:
   I actually find this class very confusing and the inheritors logic is very difficult to understand. Is it possible to get rid of it even at the cost of some copy-paste?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Recovery protocol descriptor.
+ */
+public class RecoveryDescriptor {
+    /** Unacknowledged messages. */
+    private final ArrayDeque<OutNetworkObject> unacknowledgedMessages;

Review Comment:
   ```suggestion
       private final Queue<OutNetworkObject> unacknowledgedMessages;
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -183,7 +188,7 @@ public CompletableFuture<NettySender> channel(@Nullable String consistentId, Soc
         // or didn't perform the handhsake operaton, can be reused.

Review Comment:
   ```suggestion
           // or didn't perform the handshake operation, can be reused.
   ```



##########
modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Recovery protocol handshake flow test.
+ */
+public class RecoveryHandshakeTest {
+    /** Connection id. */
+    private static final short CONNECTION_ID = 1337;
+
+    /** Serialization registry. */
+    private static final MessageSerializationRegistry MESSAGE_REGISTRY = new TestMessageSerializationRegistryImpl();
+
+    /** Message factory. */
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
+
+    /** Test message factory. */
+    private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new TestMessagesFactory();
+
+    @Test
+    public void testHandshake() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+        assertNull(serverSideChannel.readOutbound());
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedServerMessage() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        UUID clientLaunchId = UUID.randomUUID();
+        RecoveryDescriptor serverRecoveryDescriptor = serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID, true);
+        addUnacknowledgedMessages(serverRecoveryDescriptor);
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager("client", clientLaunchId, clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        assertNull(serverSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeNotCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedClientMessage() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        UUID serverLaunchId = UUID.randomUUID();
+        RecoveryDescriptor clientRecoveryDescriptor = clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID, false);
+        addUnacknowledgedMessages(clientRecoveryDescriptor);
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager("server", serverLaunchId, serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(serverSideChannel.readOutbound());
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        assertNull(clientSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeNotCompleted(clientHandshakeManager);
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testPairedRecoveryDescriptors() throws Exception {
+        RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider();
+
+        var node1Uuid = UUID.randomUUID();
+        var node2Uuid = UUID.randomUUID();
+
+        var chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
+        var shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+
+        var chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
+        var shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+
+        EmbeddedChannel out1to2 = setupChannel(chm1, noMessageListener);
+        EmbeddedChannel in1to2 = setupChannel(shm1, noMessageListener);
+        EmbeddedChannel out2to1 = setupChannel(chm2, noMessageListener);
+        EmbeddedChannel in2to1 = setupChannel(shm2, noMessageListener);
+
+        exchangeServerToClient(in1to2, out2to1);
+        exchangeServerToClient(in2to1, out1to2);
+
+        exchangeClientToServer(in1to2, out2to1);
+        exchangeClientToServer(in2to1, out1to2);
+
+        assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
+        assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+    }
+
+    @Test
+    public void testExactlyOnceServer() throws Exception {
+        testExactlyOnce(true);
+    }
+
+    @Test
+    public void testExactlyOnceClient() throws Exception {
+        testExactlyOnce(false);
+    }
+
+    /**
+     * Tests that message was received exactly once in case if network failure during acknowledgement.
+     *
+     * @param serverDidntReceiveAck {@code true} if server didn't receive the acknowledgement, {@code false} if client didn't receive
+     *                              the acknowledgement.
+     * @throws Exception If failed.
+     */
+    private void testExactlyOnce(boolean serverDidntReceiveAck) throws Exception {
+        var server = "server";
+        var serverLaunchId = UUID.randomUUID();
+        var client = "client";
+        var clientLaunchId = UUID.randomUUID();
+
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
+        var serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+
+        var receivedFirst = new AtomicBoolean();
+
+        var listener1 = new MessageListener("1", receivedFirst);
+
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener1 : noMessageListener);
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ?  noMessageListener : listener1);
+
+        // Normal handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        var ch = serverDidntReceiveAck ? serverSideChannel : clientSideChannel;
+
+        // Add two messages to the outbound
+        ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("1").build(), Collections.emptyList()));
+        ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("2").build(), Collections.emptyList()));
+
+        // Send one of the messages
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Message should be received
+        assertTrue(receivedFirst.get());
+
+        // Transfer only one acknowledgement, don't transfer the second one (simulates network failure on acknowledgement)
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        // Simulate reconnection
+        clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
+        serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+
+        var receivedSecond = new AtomicBoolean();
+
+        var listener2 = new MessageListener("2", receivedSecond);
+
+        clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
+        serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
+
+        // Handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        // Resending message
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Send another acknowledgement
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        assertNull(serverSideChannel.readOutbound());
+        assertNull(clientSideChannel.readOutbound());
+
+        assertTrue(receivedSecond.get());
+    }
+
+    /** Message listener that accepts a specific message only once. */
+    private static class MessageListener implements Consumer<InNetworkObject> {
+        /** Expected message. */
+        private final String expectedMessage;
+
+        /** Flag indicating that expected messages was received. */
+        private final AtomicBoolean flag;
+
+        private MessageListener(String expectedMessage, AtomicBoolean flag) {
+            this.expectedMessage = expectedMessage;
+            this.flag = flag;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void accept(InNetworkObject inNetworkObject) {
+            var msg = (TestMessage) inNetworkObject.message();
+            if (expectedMessage.equals(msg.msg())) {
+                if (!flag.compareAndSet(false, true)) {
+                    fail();
+                }
+                return;
+            }
+            fail();
+        }
+    }
+
+    private void checkPipelineAfterHandshake(EmbeddedChannel channel) {
+        assertNull(channel.pipeline().get(HandshakeHandler.NAME));
+    }
+
+    private void checkHandshakeNotCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+        assertFalse(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void checkHandshakeCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+        assertTrue(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void addUnacknowledgedMessages(RecoveryDescriptor recoveryDescriptor) {
+        TestMessage msg = TEST_MESSAGES_FACTORY.testMessage().msg("test").build();
+        recoveryDescriptor.add(new OutNetworkObject(msg, Collections.emptyList()));
+    }
+
+    private void exchangeServerToClient(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = serverSideChannel.readOutbound();
+        clientSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private void exchangeClientToServer(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = clientSideChannel.readOutbound();
+        serverSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private final Consumer<InNetworkObject> noMessageListener = inNetworkObject ->
+            fail("Received message while shouldn't have, [" + inNetworkObject.message() + "]");
+
+    private EmbeddedChannel setupChannel(HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) throws Exception {
+        // Channel should not be registered at first, not before we add pipeline handlers
+        // Otherwise, events like "channel active" won't be propagated to the handlers
+        var channel = new EmbeddedChannel(false, false);
+
+        var serializationService = new SerializationService(MESSAGE_REGISTRY, createUserObjectSerializationContext());
+        var sessionSerializationService = new PerSessionSerializationService(serializationService);
+
+        PipelineUtils.setup(channel.pipeline(), sessionSerializationService, handshakeManager, messageListener);
+
+        channel.register();
+
+        return channel;
+    }
+
+    private UserObjectSerializationContext createUserObjectSerializationContext() {
+        var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+        var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+        var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+
+        return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+                userObjectMarshaller);
+    }
+
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryClientHandshakeManager("client", UUID.randomUUID(), provider);
+    }
+
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryServerHandshakeManager("server", UUID.randomUUID(), provider);
+    }
+
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryServerHandshakeManager(launchId, consistentId, MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {

Review Comment:
   What's the point of this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871124573


##########
modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java:
##########
@@ -24,7 +24,7 @@
  */
 public interface NetworkMessage {
     /** Size of the message type (in bytes), used during (de-)serialization. */
-    static final int MSG_TYPE_SIZE_BYTES = 4;
+    int MSG_TYPE_SIZE_BYTES = 4;

Review Comment:
   I meant message type specifically, it has its own code somewhere, as far as I know



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871127007


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.network.OutNetworkObject;
+
+/** Outbound handler that adds outgoing message to the recovery descriptor. */
+public class OutboundRecoveryHandler extends ChannelOutboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "outbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     */
+    public OutboundRecoveryHandler(RecoveryDescriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+        OutNetworkObject outNetworkObject = (OutNetworkObject) msg;
+
+        if (outNetworkObject.shouldBeSavedForRecovery()) {
+            descriptor.add(outNetworkObject);

Review Comment:
   Maybe they were final, or you tried to serialize them once again. It would be interesting to know



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r872309307


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -75,40 +79,41 @@ public class ConnectionManager {
     /** Node consistent id. */
     private final String consistentId;
 
-    /** Client handshake manager factory. */
-    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
+    /** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
+    private final UUID launchId;
 
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
 
     /** Stop flag. */
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
+    /** Recovery descriptor provider. */
+    private final RecoveryDescriptorProvider descriptorProvider = new DefaultRecoveryDescriptorProvider();

Review Comment:
   I'm not sure, factory often doesn't hold any state and this provider is stateful



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #804: IGNITE-14085 Introduce network recovery protocol

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r872312674


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -60,6 +63,7 @@ public class ConnectionManager {
     /** Server. */
     private final NettyServer server;
 
+    // TODO: IGNITE-16948 Should be a map consistentId -> connectionId -> sender

Review Comment:
   I've changed the issue accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org