You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/08/11 08:01:09 UTC

[ignite-3] branch main updated: IGNITE-15275 Fix handshake operation (#265)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 93afd72  IGNITE-15275 Fix handshake operation (#265)
93afd72 is described below

commit 93afd72f9e7b3f4c59bcc02b5c7ec5a6955dd0f1
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Aug 11 11:00:59 2021 +0300

    IGNITE-15275 Fix handshake operation (#265)
---
 .../internal/testframework/IgniteTestUtils.java    | 25 +++++++-
 .../network/recovery/RecoveryHandshakeTest.java    | 74 +++++++++++++++++++++-
 .../internal/network/netty/ConnectionManager.java  |  4 +-
 .../ignite/internal/network/netty/NettyServer.java | 18 ++++--
 .../internal/network/netty/NettyClientTest.java    |  4 +-
 .../internal/network/netty/NettyServerTest.java    |  8 +--
 6 files changed, 116 insertions(+), 17 deletions(-)

diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 83043a5..838b7eb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -19,11 +19,12 @@ package org.apache.ignite.internal.testframework;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-
+import java.util.function.BooleanSupplier;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.lang.Thread.sleep;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /**
@@ -184,4 +185,26 @@ public final class IgniteTestUtils {
 
         return false;
     }
+
+    /**
+     * Waits for the condition.
+     *
+     * @param cond Condition.
+     * @param timeoutMillis Timeout in milliseconds.
+     * @return {@code True} if the condition was satisfied within the timeout.
+     * @throws InterruptedException If waiting was interrupted.
+     */
+    @SuppressWarnings("BusyWait") public static boolean waitForCondition(BooleanSupplier cond, long timeoutMillis)
+        throws InterruptedException {
+        long stop = System.currentTimeMillis() + timeoutMillis;
+
+        while (System.currentTimeMillis() < stop) {
+            if (cond.getAsBoolean())
+                return true;
+
+            sleep(50);
+        }
+
+        return false;
+    }
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/RecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/RecoveryHandshakeTest.java
index f2b5456..42c6e7e 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/RecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/RecoveryHandshakeTest.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.network.recovery;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import io.netty.channel.Channel;
@@ -45,8 +47,10 @@ import static org.apache.ignite.internal.network.recovery.RecoveryHandshakeTest.
 import static org.apache.ignite.internal.network.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
 import static org.apache.ignite.internal.network.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
 import static org.apache.ignite.internal.network.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Recovery protocol handshake tests.
@@ -139,6 +143,46 @@ public class RecoveryHandshakeTest {
     }
 
     /**
+     * Tests that one server can handle multiple incoming connections and perform the handshake operation successfully.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testHandshakeWithMultipleClients() throws Exception {
+        ConnectionManager server = startManager(4000);
+
+        List<ConnectionManager> clients = new ArrayList<>();
+
+        int clientCount = 10;
+
+        for (int i = 0; i < clientCount; i++)
+            clients.add(startManager(4001 + i));
+
+        // A key is the client's consistent id, a value is the channel between the client and the server
+        var channelsToServer = new HashMap<String, NettySender>();
+
+        for (ConnectionManager client : clients) {
+            channelsToServer.put(
+                client.consistentId(),
+                client.channel(server.consistentId(), server.getLocalAddress()).get(3, TimeUnit.SECONDS)
+            );
+        }
+
+        assertTrue(waitForCondition(() -> server.channels().size() == clientCount, TimeUnit.SECONDS.toMillis(3)));
+
+        Map<String, NettySender> channels = server.channels();
+
+        // Assert that server's channels are inbound connections opened from clients
+        channelsToServer.forEach((consistentId, toServer) -> {
+            NettySender toClient = channels.get(consistentId);
+
+            assertNotNull(toClient);
+
+            assertEquals(toServer.channel().localAddress(), toClient.channel().remoteAddress());
+        });
+    }
+
+    /**
      * @return Generates handshake scenarios.
      */
     private static List<HandshakeScenario> handshakeScenarios() {
@@ -310,7 +354,7 @@ public class RecoveryHandshakeTest {
     }
 
     /**
-     * Create and start a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
+     * Starts a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
      *
      * @param port Port for the {@link ConnectionManager#server}.
      * @param serverHandshakeFailAt At what stage to fail server handshake.
@@ -344,4 +388,32 @@ public class RecoveryHandshakeTest {
         return manager;
     }
 
+    /**
+     * Starts a {@link ConnectionManager} with a normal handshake manager adding it to the {@link #startedManagers} list.
+     *
+     * @param port Port.
+     * @return Connection manager
+     */
+    private ConnectionManager startManager(int port) {
+        var registry = new TestMessageSerializationRegistryImpl();
+
+        var messageFactory = new NetworkMessagesFactory();
+
+        UUID launchId = UUID.randomUUID();
+        String consistentId = UUID.randomUUID().toString();
+
+        var manager = new ConnectionManager(
+            port,
+            registry,
+            consistentId,
+            () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
+            () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory)
+        );
+
+        manager.start();
+
+        startedManagers.add(manager);
+
+        return manager;
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 1570026..ffed4ad 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -36,10 +36,10 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -102,7 +102,7 @@ public class ConnectionManager {
         this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
         this.server = new NettyServer(
             port,
-            serverHandshakeManagerFactory.get(),
+            serverHandshakeManagerFactory,
             this::onNewIncomingChannel,
             this::onMessage,
             serializationRegistry
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index ba961fa..600c396 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -69,7 +70,7 @@ public class NettyServer {
     private final BiConsumer<SocketAddress, NetworkMessage> messageListener;
 
     /** Handshake manager. */
-    private final HandshakeManager handshakeManager;
+    private final Supplier<HandshakeManager> handshakeManager;
 
     /** Server start future. */
     private CompletableFuture<Void> serverStartFuture;
@@ -94,14 +95,14 @@ public class NettyServer {
      * Constructor.
      *
      * @param port Server port.
-     * @param handshakeManager Handshake manager.
+     * @param handshakeManager Handshake manager supplier.
      * @param newConnectionListener New connections listener.
      * @param messageListener Message listener.
      * @param serializationRegistry Serialization registry.
      */
     public NettyServer(
         int port,
-        HandshakeManager handshakeManager,
+        Supplier<HandshakeManager> handshakeManager,
         Consumer<NettySender> newConnectionListener,
         BiConsumer<SocketAddress, NetworkMessage> messageListener,
         MessageSerializationRegistry serializationRegistry
@@ -114,7 +115,7 @@ public class NettyServer {
      *
      * @param bootstrap Server bootstrap.
      * @param port Server port.
-     * @param handshakeManager Handshake manager.
+     * @param handshakeManager Handshake manager supplier.
      * @param newConnectionListener New connections listener.
      * @param messageListener Message listener.
      * @param serializationRegistry Serialization registry.
@@ -122,7 +123,7 @@ public class NettyServer {
     public NettyServer(
         ServerBootstrap bootstrap,
         int port,
-        HandshakeManager handshakeManager,
+        Supplier<HandshakeManager> handshakeManager,
         Consumer<NettySender> newConnectionListener,
         BiConsumer<SocketAddress, NetworkMessage> messageListener,
         MessageSerializationRegistry serializationRegistry
@@ -153,6 +154,9 @@ public class NettyServer {
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     /** {@inheritDoc} */
                     @Override public void initChannel(SocketChannel ch) {
+                        // Get handshake manager for the new channel.
+                        HandshakeManager manager = handshakeManager.get();
+
                         ch.pipeline().addLast(
                             /*
                              * Decoder that uses the MessageReader
@@ -160,7 +164,7 @@ public class NettyServer {
                              */
                             new InboundDecoder(serializationRegistry),
                             // Handshake handler.
-                            new HandshakeHandler(handshakeManager),
+                            new HandshakeHandler(manager),
                             // Handles decoded NetworkMessages.
                             new MessageHandler(messageListener),
                             /*
@@ -173,7 +177,7 @@ public class NettyServer {
                             new IoExceptionSuppressingHandler()
                         );
 
-                        handshakeManager.handshakeFuture().thenAccept(newConnectionListener);
+                        manager.handshakeFuture().thenAccept(newConnectionListener);
                     }
                 })
                 /*
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index dbef451..6140cd5 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -28,10 +28,10 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.internal.network.handshake.HandshakeAction;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index 74fb72d..c2c99af 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -32,10 +32,10 @@ import io.netty.channel.ServerChannel;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.internal.network.handshake.HandshakeAction;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageDeserializer;
 import org.apache.ignite.network.serialization.MessageMappingException;
 import org.apache.ignite.network.serialization.MessageReader;
@@ -201,7 +201,7 @@ public class NettyServerTest {
                 }
             });
 
-        server = new NettyServer(4000, handshakeManager, sender -> {}, (socketAddress, message) -> {}, registry);
+        server = new NettyServer(4000, () -> handshakeManager, sender -> {}, (socketAddress, message) -> {}, registry);
 
         server.start().get(3, TimeUnit.SECONDS);
 
@@ -258,7 +258,7 @@ public class NettyServerTest {
 
         Mockito.doReturn(future).when(bootstrap).bind(Mockito.anyInt());
 
-        var server = new NettyServer(bootstrap, 0, mock(HandshakeManager.class), null, null, null);
+        var server = new NettyServer(bootstrap, 0, () -> mock(HandshakeManager.class), null, null, null);
 
         try {
             server.start().get(3, TimeUnit.SECONDS);