You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/19 16:38:42 UTC

[GitHub] [ignite-3] SammyVimes opened a new pull request #135: IGNITE-14082 Network handshake

SammyVimes opened a new pull request #135:
URL: https://github.com/apache/ignite-3/pull/135


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r637375227



##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -110,21 +125,24 @@ public SocketAddress getLocalAddress() {
 
     /**
      * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param consistentId Another node's consistent id.
      * @param address Another node's address.
      * @return Sender.
      */
-    public CompletableFuture<NettySender> channel(SocketAddress address) {
-        NettySender channel = channels.compute(
-            address,
-            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
-        );
-
-        if (channel != null)
-            return CompletableFuture.completedFuture(channel);
+    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+        if (consistentId != null) {

Review comment:
       Sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r637374352



##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -110,21 +125,24 @@ public SocketAddress getLocalAddress() {
 
     /**
      * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param consistentId Another node's consistent id.
      * @param address Another node's address.
      * @return Sender.
      */
-    public CompletableFuture<NettySender> channel(SocketAddress address) {
-        NettySender channel = channels.compute(
-            address,
-            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
-        );
-
-        if (channel != null)
-            return CompletableFuture.completedFuture(channel);
+    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+        if (consistentId != null) {
+            NettySender channel = channels.compute(
+                consistentId,
+                (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
+            );
+
+            if (channel != null)
+                return CompletableFuture.completedFuture(channel);
+        }
 
         NettyClient client = clients.compute(address, (addr, existingClient) ->
             existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected() ?
-                existingClient : connect(addr)
+                existingClient : connect(addr, this::onMessage)
         );
 
         return client.sender();

Review comment:
       Something is wrong with my IDEA, it stopped warning me about such things. Definitely needs an assertion, thank you 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r637374044



##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Handshake operation manager.
+ */
+public interface HandshakeManager {
+    /**
+     * Initialize handshake manager with the channel.
+     *
+     * @param channel Channel.
+     */
+    void init(Channel channel);
+
+    /**
+     * Handle an event of the connection opening.
+     *
+     * @param channel Channel.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction onConnectionOpen(Channel channel);

Review comment:
       In the future, I would like this method to check if there is an existing "handshaken" channel and this channel could be closed in favor of the existing one. I guess it won't hurt for now to always be a no-op.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r636322714



##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
##########
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+public class RecoveryHandshakeTest {

Review comment:
       What is this class for?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r637373844



##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty handler of the handshake operation.
+ */
+public class HandshakeHandler extends ChannelDuplexHandler {

Review comment:
       Sure, this should be an InboundHandler. In the beginning I was thinking that this handler will be more like a `RecoveryProtocolHandler` so it would not only handle inbound messages but also outbound (incrementing counters and waiting for ACKs). But now I think that this handler will not be just removed but replaced by recovery protocol handler after the hadnshake, hence changing it to InboundHandler




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r636325950



##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -62,7 +64,9 @@
                     topologyService.onMembershipEvent(event);
                 }
             })
-            .config(opts -> opts.memberAlias(context.getName()))
+            .config(opts -> {

Review comment:
       No need for braces here

##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
##########
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+public class RecoveryHandshakeTest {

Review comment:
       What is this class for?

##########
File path: modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -177,4 +282,29 @@ private static NettyServer getServer(ChannelFuture future, boolean shouldStart)
     private static class EmbeddedServerChannel extends EmbeddedChannel implements ServerChannel {
         // No-op.
     }
+
+    /**
+     * Server handshake manager that doesn't do any actual handshaking.
+     */
+    private static class MockServerHandshakeManager implements HandshakeManager {

Review comment:
       it would be shorter to use Mockito instead

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -45,10 +51,16 @@
     @Nullable
     private volatile CompletableFuture<NettySender> clientFuture = null;
 
+    private CompletableFuture<Void> channelFuture = new CompletableFuture<>();

Review comment:
       1. Why do you need this future?
   2. Javadoc is missing

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Handshake operation manager.
+ */
+public interface HandshakeManager {
+    /**
+     * Initialize handshake manager with the channel.
+     *
+     * @param channel Channel.
+     */
+    void init(Channel channel);
+
+    /**
+     * Handle an event of the connection opening.
+     *
+     * @param channel Channel.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction onConnectionOpen(Channel channel);

Review comment:
       this method always returns a `NOOP`. Does it really need to return anything?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -146,24 +164,32 @@ private void onMessage(SocketAddress from, NetworkMessage message) {
      * @param channel Channel from client to this {@link #server}.
      */
     private void onNewIncomingChannel(NettySender channel) {
-        SocketAddress remoteAddress = channel.remoteAddress();
-        channels.put(remoteAddress, channel);
+        channels.put(channel.consistentId(), channel);
     }
 
     /**
      * Create new client from this node to specified address.
      *
      * @param address Target address.
+     * @param messageListener Message listener.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
-        NettyClient client = new NettyClient(address, serializationRegistry);
+    private NettyClient connect(
+        SocketAddress address,
+        BiConsumer<SocketAddress, NetworkMessage> messageListener

Review comment:
       this parameter is always the same, can we inline it?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessage.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery.message;
+
+import java.util.UUID;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Handshake start response message.
+ */
+public class HandshakeStartResponseMessage implements NetworkMessage {
+    /** */
+    public static final byte TYPE = 3;
+
+    /** Launch id. */
+    private final UUID launchId;
+
+    /** Consistent id. */
+    private final String consistentId;
+
+    /** Number of received messages. */
+    private final long rcvCnt;

Review comment:
       Why do you need these counters?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -110,21 +125,24 @@ public SocketAddress getLocalAddress() {
 
     /**
      * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param consistentId Another node's consistent id.
      * @param address Another node's address.
      * @return Sender.
      */
-    public CompletableFuture<NettySender> channel(SocketAddress address) {
-        NettySender channel = channels.compute(
-            address,
-            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
-        );
-
-        if (channel != null)
-            return CompletableFuture.completedFuture(channel);
+    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+        if (consistentId != null) {

Review comment:
       Can you add some comments in the code explaining the logic of this method? I already forgot the correspondence between `channels` and `clients` map =(

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -122,4 +123,10 @@ public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
             }
         }
     }
+
+    /** */

Review comment:
       Can you add a comment about why is this method needed?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -110,21 +125,24 @@ public SocketAddress getLocalAddress() {
 
     /**
      * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param consistentId Another node's consistent id.
      * @param address Another node's address.
      * @return Sender.
      */
-    public CompletableFuture<NettySender> channel(SocketAddress address) {
-        NettySender channel = channels.compute(
-            address,
-            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
-        );
-
-        if (channel != null)
-            return CompletableFuture.completedFuture(channel);
+    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+        if (consistentId != null) {
+            NettySender channel = channels.compute(
+                consistentId,
+                (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
+            );
+
+            if (channel != null)
+                return CompletableFuture.completedFuture(channel);
+        }
 
         NettyClient client = clients.compute(address, (addr, existingClient) ->
             existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected() ?
-                existingClient : connect(addr)
+                existingClient : connect(addr, this::onMessage)
         );
 
         return client.sender();

Review comment:
       IDEA complains that `sender` might return `null`. Shall we add an `assert` here?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Handshake operation manager.
+ */
+public interface HandshakeManager {
+    /**
+     * Initialize handshake manager with the channel.
+     *
+     * @param channel Channel.
+     */
+    void init(Channel channel);
+
+    /**
+     * Handle an event of the connection opening.
+     *
+     * @param channel Channel.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction onConnectionOpen(Channel channel);
+
+    /**
+     * Handle an incoming message.
+     *
+     * @param channel Channel.
+     * @param message Message to handle.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction onMessage(Channel channel, NetworkMessage message);
+
+    /**
+     * @return Future that represents handshake operation.

Review comment:
       ```suggestion
        * @return Future that represents the handshake operation.
   ```

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty handler of the handshake operation.
+ */
+public class HandshakeHandler extends ChannelDuplexHandler {

Review comment:
       Why is this a `ChannelDuplexHandler`? I can see that it only handles incoming messages...

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+
+/**
+ * Enum representing actions that are propagated from a {@link HandshakeManager} to a {@link HandshakeHandler}.
+ */
+public enum HandshakeAction {
+    /** Fail handshake operation and close channel. */
+    FAIL,
+
+    /** Remove handshake handler. */

Review comment:
       ```suggestion
       /** Remove the handshake handler. */
   ```

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+
+/**
+ * Enum representing actions that are propagated from a {@link HandshakeManager} to a {@link HandshakeHandler}.
+ */
+public enum HandshakeAction {
+    /** Fail handshake operation and close channel. */

Review comment:
       ```suggestion
       /** Fail the handshake operation and close the channel. */
   ```

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -57,54 +69,85 @@
      *
      * @param host Host.
      * @param port Port.
+     * @param manager Client handshake manager.
      * @param serializationRegistry Serialization registry.
+     * @param messageListener Message listener.
      */
     public NettyClient(

Review comment:
       This constructor is not used anywhere

##########
File path: modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -147,6 +163,95 @@ public void testStartTwice() throws Exception {
         assertThrows(IgniteInternalException.class, server::start);
     }
 
+    /**
+     * Tests that handshake manager is invoked upon a client connecting to a server.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testHandshakeManagerInvoked() throws Exception {
+        HandshakeManager handshakeManager = Mockito.mock(HandshakeManager.class);
+
+        Mockito.doReturn(CompletableFuture.completedFuture(Mockito.mock(NettySender.class)))
+            .when(handshakeManager).handshakeFuture();
+
+        Mockito.doReturn(HandshakeAction.NOOP)
+            .when(handshakeManager).onConnectionOpen(Mockito.any());
+
+        Mockito.doReturn(HandshakeAction.NOOP)
+            .when(handshakeManager).onMessage(Mockito.any(), Mockito.any());
+
+        MessageSerializationRegistry registry = new MessageSerializationRegistry() {
+            /** {@inheritDoc} */
+            @Override public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short type) {
+                return (MessageDeserializer<T>) new MessageDeserializer<NetworkMessage>() {
+                    /** {@inheritDoc} */
+                    @Override public boolean readMessage(MessageReader reader) throws MessageMappingException {
+                        return true;
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public Class<NetworkMessage> klass() {
+                        return NetworkMessage.class;
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public NetworkMessage getMessage() {
+                        return new NetworkMessage() {
+                            /** {@inheritDoc} */
+                            @Override public short directType() {
+                                return 0;
+                            }
+                        };
+                    }
+                };
+            }
+        };
+
+        server = new NettyServer(4000, handshakeManager, sender -> {}, (socketAddress, message) -> {}, registry);
+
+        server.start().get(3, TimeUnit.SECONDS);
+
+        CompletableFuture<Channel> connectFut = NettyUtils.toChannelCompletableFuture(
+            new Bootstrap()
+                .channel(NioSocketChannel.class)
+                .group(new NioEventLoopGroup())
+                .handler(new ChannelInitializer<>() {
+                    /** {@inheritDoc} */
+                    @Override protected void initChannel(Channel ch) throws Exception {
+                        // No-op.
+                    }
+                })
+                .connect(server.address())
+        );
+
+        Channel channel = connectFut.get(3, TimeUnit.SECONDS);
+
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+
+        // One message only.
+        for (int i = 0; i < (NetworkMessage.DIRECT_TYPE_SIZE + 1); i++)
+            buffer.writeByte(1);
+
+        NettyUtils.toCompletableFuture(channel.writeAndFlush(buffer)).get(3, TimeUnit.SECONDS);

Review comment:
       There's no need to call `NettyUtils.toCompletableFuture` here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r641388924



##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
##########
@@ -23,7 +23,7 @@
 @AutoSerializable(messageFactory = TestMessageFactory.class)
 public interface TestMessage extends NetworkMessage {
     /** Visible type for tests. */
-    public static final short TYPE = 3;
+    public static final short TYPE = 4;

Review comment:
       Can we use something like 16000 to 100% avoid collision?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r637086926



##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Recovery protocol handshake tests.
+ */
+public class RecoveryHandshakeTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    final void tearDown() {
+        startedManagers.forEach(ConnectionManager::stop);
+    }
+
+    /**
+     * Tests handshake scenarios in which some of the parts of handshake protocol are failed.

Review comment:
       ```suggestion
        * Tests handshake scenarios in which some of the parts of handshake protocol can fail.
   ```

##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Recovery protocol handshake tests.
+ */
+public class RecoveryHandshakeTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    final void tearDown() {
+        startedManagers.forEach(ConnectionManager::stop);
+    }
+
+    /**
+     * Tests handshake scenarios in which some of the parts of handshake protocol are failed.
+     *
+     * @param scenario Handshake scenario.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("handshakeScenarios")
+    public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
+        ConnectionManager manager1 = startManager(
+            4000,
+            scenario.serverFailAt,
+            CLIENT_DOESNT_FAIL
+        );
+
+        ConnectionManager manager2 = startManager(
+            4001,
+            SERVER_DOESNT_FAIL,
+            scenario.clientFailAt
+        );
+
+        NettySender from2to1;
+
+        try {
+            from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+        }
+        catch (Exception e) {
+            if (scenario.clientFailAt == CLIENT_DOESNT_FAIL &&
+                scenario.serverFailAt == SERVER_DOESNT_FAIL)
+                Assertions.fail(e);
+
+            return;
+        }
+
+        if (scenario.serverFailAt == SERVER_CLIENT_RESPONDED && scenario.clientFailAt == CLIENT_DOESNT_FAIL) {
+            // Special case: the client assumes a handshake has been finished, but the server fails on client's response.
+            // The server will then close a connection and the client should get the "connection closed event".
+            NettyUtils.toCompletableFuture(from2to1.channel().closeFuture()).get(3, TimeUnit.SECONDS);

Review comment:
       no need for NettyUtils here

##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Recovery protocol handshake tests.
+ */
+public class RecoveryHandshakeTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    final void tearDown() {
+        startedManagers.forEach(ConnectionManager::stop);
+    }
+
+    /**
+     * Tests handshake scenarios in which some of the parts of handshake protocol are failed.
+     *
+     * @param scenario Handshake scenario.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("handshakeScenarios")
+    public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
+        ConnectionManager manager1 = startManager(
+            4000,
+            scenario.serverFailAt,
+            CLIENT_DOESNT_FAIL
+        );
+
+        ConnectionManager manager2 = startManager(
+            4001,
+            SERVER_DOESNT_FAIL,
+            scenario.clientFailAt
+        );
+
+        NettySender from2to1;
+
+        try {
+            from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+        }
+        catch (Exception e) {
+            if (scenario.clientFailAt == CLIENT_DOESNT_FAIL &&
+                scenario.serverFailAt == SERVER_DOESNT_FAIL)
+                Assertions.fail(e);
+
+            return;
+        }
+
+        if (scenario.serverFailAt == SERVER_CLIENT_RESPONDED && scenario.clientFailAt == CLIENT_DOESNT_FAIL) {
+            // Special case: the client assumes a handshake has been finished, but the server fails on client's response.
+            // The server will then close a connection and the client should get the "connection closed event".
+            NettyUtils.toCompletableFuture(from2to1.channel().closeFuture()).get(3, TimeUnit.SECONDS);
+            return;
+        }
+
+        if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL)

Review comment:
       I would suggest extracting this condition into a method

##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Recovery protocol handshake tests.
+ */
+public class RecoveryHandshakeTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    final void tearDown() {
+        startedManagers.forEach(ConnectionManager::stop);
+    }
+
+    /**
+     * Tests handshake scenarios in which some of the parts of handshake protocol are failed.
+     *
+     * @param scenario Handshake scenario.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("handshakeScenarios")
+    public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
+        ConnectionManager manager1 = startManager(
+            4000,
+            scenario.serverFailAt,
+            CLIENT_DOESNT_FAIL

Review comment:
       What does failing some parts actually test? I don't see any particular assertions related to it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r639473116



##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Recovery protocol handshake tests.
+ */
+public class RecoveryHandshakeTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    final void tearDown() {
+        startedManagers.forEach(ConnectionManager::stop);
+    }
+
+    /**
+     * Tests handshake scenarios in which some of the parts of handshake protocol are failed.
+     *
+     * @param scenario Handshake scenario.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("handshakeScenarios")
+    public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
+        ConnectionManager manager1 = startManager(
+            4000,
+            scenario.serverFailAt,
+            CLIENT_DOESNT_FAIL

Review comment:
       It tests two things: 
   * channel that is failed to handshake is closed
   * connections are not becoming stale and futures complete exceptionally




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov merged pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
ibessonov merged pull request #135:
URL: https://github.com/apache/ignite-3/pull/135


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r637374257



##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessage.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery.message;
+
+import java.util.UUID;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Handshake start response message.
+ */
+public class HandshakeStartResponseMessage implements NetworkMessage {
+    /** */
+    public static final byte TYPE = 3;
+
+    /** Launch id. */
+    private final UUID launchId;
+
+    /** Consistent id. */
+    private final String consistentId;
+
+    /** Number of received messages. */
+    private final long rcvCnt;

Review comment:
       Basically, I was moving messages from ignite-2.x to 3.x and these counters are the part of the recovery protocol, so there is no need for them right now. I can remove them, of course, but in the next commit they'll be added again, so... :D 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r637374879



##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -45,10 +51,16 @@
     @Nullable
     private volatile CompletableFuture<NettySender> clientFuture = null;
 
+    private CompletableFuture<Void> channelFuture = new CompletableFuture<>();

Review comment:
       1. Basically, we had one future for client, `clientFuture`. It resolves not when the channel is open but when the handshake operation is complete. In case if we are stopping a client mid-handshake there is no way to close the channel immediately, that's why I added a future for "channel is open" event. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #135: IGNITE-14082 Network handshake

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r639473116



##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Recovery protocol handshake tests.
+ */
+public class RecoveryHandshakeTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    final void tearDown() {
+        startedManagers.forEach(ConnectionManager::stop);
+    }
+
+    /**
+     * Tests handshake scenarios in which some of the parts of handshake protocol are failed.
+     *
+     * @param scenario Handshake scenario.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("handshakeScenarios")
+    public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
+        ConnectionManager manager1 = startManager(
+            4000,
+            scenario.serverFailAt,
+            CLIENT_DOESNT_FAIL

Review comment:
       It tests various things at one time: it tests a case where channel that is failed to handshake is closed, that connections are not becoming stale and futures complete exceptionally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org