You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/21 10:00:06 UTC

[GitHub] [ignite-3] sashapolo commented on a change in pull request #135: IGNITE-14082 Network handshake

sashapolo commented on a change in pull request #135:
URL: https://github.com/apache/ignite-3/pull/135#discussion_r636325950



##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -62,7 +64,9 @@
                     topologyService.onMembershipEvent(event);
                 }
             })
-            .config(opts -> opts.memberAlias(context.getName()))
+            .config(opts -> {

Review comment:
       No need for braces here

##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
##########
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+public class RecoveryHandshakeTest {

Review comment:
       What is this class for?

##########
File path: modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -177,4 +282,29 @@ private static NettyServer getServer(ChannelFuture future, boolean shouldStart)
     private static class EmbeddedServerChannel extends EmbeddedChannel implements ServerChannel {
         // No-op.
     }
+
+    /**
+     * Server handshake manager that doesn't do any actual handshaking.
+     */
+    private static class MockServerHandshakeManager implements HandshakeManager {

Review comment:
       it would be shorter to use Mockito instead

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -45,10 +51,16 @@
     @Nullable
     private volatile CompletableFuture<NettySender> clientFuture = null;
 
+    private CompletableFuture<Void> channelFuture = new CompletableFuture<>();

Review comment:
       1. Why do you need this future?
   2. Javadoc is missing

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Handshake operation manager.
+ */
+public interface HandshakeManager {
+    /**
+     * Initialize handshake manager with the channel.
+     *
+     * @param channel Channel.
+     */
+    void init(Channel channel);
+
+    /**
+     * Handle an event of the connection opening.
+     *
+     * @param channel Channel.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction onConnectionOpen(Channel channel);

Review comment:
       this method always returns a `NOOP`. Does it really need to return anything?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -146,24 +164,32 @@ private void onMessage(SocketAddress from, NetworkMessage message) {
      * @param channel Channel from client to this {@link #server}.
      */
     private void onNewIncomingChannel(NettySender channel) {
-        SocketAddress remoteAddress = channel.remoteAddress();
-        channels.put(remoteAddress, channel);
+        channels.put(channel.consistentId(), channel);
     }
 
     /**
      * Create new client from this node to specified address.
      *
      * @param address Target address.
+     * @param messageListener Message listener.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
-        NettyClient client = new NettyClient(address, serializationRegistry);
+    private NettyClient connect(
+        SocketAddress address,
+        BiConsumer<SocketAddress, NetworkMessage> messageListener

Review comment:
       this parameter is always the same, can we inline it?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessage.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery.message;
+
+import java.util.UUID;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Handshake start response message.
+ */
+public class HandshakeStartResponseMessage implements NetworkMessage {
+    /** */
+    public static final byte TYPE = 3;
+
+    /** Launch id. */
+    private final UUID launchId;
+
+    /** Consistent id. */
+    private final String consistentId;
+
+    /** Number of received messages. */
+    private final long rcvCnt;

Review comment:
       Why do you need these counters?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -110,21 +125,24 @@ public SocketAddress getLocalAddress() {
 
     /**
      * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param consistentId Another node's consistent id.
      * @param address Another node's address.
      * @return Sender.
      */
-    public CompletableFuture<NettySender> channel(SocketAddress address) {
-        NettySender channel = channels.compute(
-            address,
-            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
-        );
-
-        if (channel != null)
-            return CompletableFuture.completedFuture(channel);
+    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+        if (consistentId != null) {

Review comment:
       Can you add some comments in the code explaining the logic of this method? I already forgot the correspondence between `channels` and `clients` map =(

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -122,4 +123,10 @@ public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
             }
         }
     }
+
+    /** */

Review comment:
       Can you add a comment about why is this method needed?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -110,21 +125,24 @@ public SocketAddress getLocalAddress() {
 
     /**
      * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param consistentId Another node's consistent id.
      * @param address Another node's address.
      * @return Sender.
      */
-    public CompletableFuture<NettySender> channel(SocketAddress address) {
-        NettySender channel = channels.compute(
-            address,
-            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
-        );
-
-        if (channel != null)
-            return CompletableFuture.completedFuture(channel);
+    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+        if (consistentId != null) {
+            NettySender channel = channels.compute(
+                consistentId,
+                (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
+            );
+
+            if (channel != null)
+                return CompletableFuture.completedFuture(channel);
+        }
 
         NettyClient client = clients.compute(address, (addr, existingClient) ->
             existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected() ?
-                existingClient : connect(addr)
+                existingClient : connect(addr, this::onMessage)
         );
 
         return client.sender();

Review comment:
       IDEA complains that `sender` might return `null`. Shall we add an `assert` here?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Handshake operation manager.
+ */
+public interface HandshakeManager {
+    /**
+     * Initialize handshake manager with the channel.
+     *
+     * @param channel Channel.
+     */
+    void init(Channel channel);
+
+    /**
+     * Handle an event of the connection opening.
+     *
+     * @param channel Channel.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction onConnectionOpen(Channel channel);
+
+    /**
+     * Handle an incoming message.
+     *
+     * @param channel Channel.
+     * @param message Message to handle.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction onMessage(Channel channel, NetworkMessage message);
+
+    /**
+     * @return Future that represents handshake operation.

Review comment:
       ```suggestion
        * @return Future that represents the handshake operation.
   ```

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty handler of the handshake operation.
+ */
+public class HandshakeHandler extends ChannelDuplexHandler {

Review comment:
       Why is this a `ChannelDuplexHandler`? I can see that it only handles incoming messages...

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+
+/**
+ * Enum representing actions that are propagated from a {@link HandshakeManager} to a {@link HandshakeHandler}.
+ */
+public enum HandshakeAction {
+    /** Fail handshake operation and close channel. */
+    FAIL,
+
+    /** Remove handshake handler. */

Review comment:
       ```suggestion
       /** Remove the handshake handler. */
   ```

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+
+/**
+ * Enum representing actions that are propagated from a {@link HandshakeManager} to a {@link HandshakeHandler}.
+ */
+public enum HandshakeAction {
+    /** Fail handshake operation and close channel. */

Review comment:
       ```suggestion
       /** Fail the handshake operation and close the channel. */
   ```

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -57,54 +69,85 @@
      *
      * @param host Host.
      * @param port Port.
+     * @param manager Client handshake manager.
      * @param serializationRegistry Serialization registry.
+     * @param messageListener Message listener.
      */
     public NettyClient(

Review comment:
       This constructor is not used anywhere

##########
File path: modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -147,6 +163,95 @@ public void testStartTwice() throws Exception {
         assertThrows(IgniteInternalException.class, server::start);
     }
 
+    /**
+     * Tests that handshake manager is invoked upon a client connecting to a server.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testHandshakeManagerInvoked() throws Exception {
+        HandshakeManager handshakeManager = Mockito.mock(HandshakeManager.class);
+
+        Mockito.doReturn(CompletableFuture.completedFuture(Mockito.mock(NettySender.class)))
+            .when(handshakeManager).handshakeFuture();
+
+        Mockito.doReturn(HandshakeAction.NOOP)
+            .when(handshakeManager).onConnectionOpen(Mockito.any());
+
+        Mockito.doReturn(HandshakeAction.NOOP)
+            .when(handshakeManager).onMessage(Mockito.any(), Mockito.any());
+
+        MessageSerializationRegistry registry = new MessageSerializationRegistry() {
+            /** {@inheritDoc} */
+            @Override public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short type) {
+                return (MessageDeserializer<T>) new MessageDeserializer<NetworkMessage>() {
+                    /** {@inheritDoc} */
+                    @Override public boolean readMessage(MessageReader reader) throws MessageMappingException {
+                        return true;
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public Class<NetworkMessage> klass() {
+                        return NetworkMessage.class;
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public NetworkMessage getMessage() {
+                        return new NetworkMessage() {
+                            /** {@inheritDoc} */
+                            @Override public short directType() {
+                                return 0;
+                            }
+                        };
+                    }
+                };
+            }
+        };
+
+        server = new NettyServer(4000, handshakeManager, sender -> {}, (socketAddress, message) -> {}, registry);
+
+        server.start().get(3, TimeUnit.SECONDS);
+
+        CompletableFuture<Channel> connectFut = NettyUtils.toChannelCompletableFuture(
+            new Bootstrap()
+                .channel(NioSocketChannel.class)
+                .group(new NioEventLoopGroup())
+                .handler(new ChannelInitializer<>() {
+                    /** {@inheritDoc} */
+                    @Override protected void initChannel(Channel ch) throws Exception {
+                        // No-op.
+                    }
+                })
+                .connect(server.address())
+        );
+
+        Channel channel = connectFut.get(3, TimeUnit.SECONDS);
+
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+
+        // One message only.
+        for (int i = 0; i < (NetworkMessage.DIRECT_TYPE_SIZE + 1); i++)
+            buffer.writeByte(1);
+
+        NettyUtils.toCompletableFuture(channel.writeAndFlush(buffer)).get(3, TimeUnit.SECONDS);

Review comment:
       There's no need to call `NettyUtils.toCompletableFuture` here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org