You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/08/11 08:01:09 UTC
[ignite-3] branch main updated: IGNITE-15275 Fix handshake
operation (#265)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 93afd72 IGNITE-15275 Fix handshake operation (#265)
93afd72 is described below
commit 93afd72f9e7b3f4c59bcc02b5c7ec5a6955dd0f1
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Aug 11 11:00:59 2021 +0300
IGNITE-15275 Fix handshake operation (#265)
---
.../internal/testframework/IgniteTestUtils.java | 25 +++++++-
.../network/recovery/RecoveryHandshakeTest.java | 74 +++++++++++++++++++++-
.../internal/network/netty/ConnectionManager.java | 4 +-
.../ignite/internal/network/netty/NettyServer.java | 18 ++++--
.../internal/network/netty/NettyClientTest.java | 4 +-
.../internal/network/netty/NettyServerTest.java | 8 +--
6 files changed, 116 insertions(+), 17 deletions(-)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 83043a5..838b7eb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -19,11 +19,12 @@ package org.apache.ignite.internal.testframework;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
-
+import java.util.function.BooleanSupplier;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.lang.Thread.sleep;
import static org.junit.jupiter.api.Assertions.fail;
/**
@@ -184,4 +185,26 @@ public final class IgniteTestUtils {
return false;
}
+
+ /**
+ * Waits for the condition.
+ *
+ * @param cond Condition.
+ * @param timeoutMillis Timeout in milliseconds.
+ * @return {@code True} if the condition was satisfied within the timeout.
+ * @throws InterruptedException If waiting was interrupted.
+ */
+ @SuppressWarnings("BusyWait") public static boolean waitForCondition(BooleanSupplier cond, long timeoutMillis)
+ throws InterruptedException {
+ long stop = System.currentTimeMillis() + timeoutMillis;
+
+ while (System.currentTimeMillis() < stop) {
+ if (cond.getAsBoolean())
+ return true;
+
+ sleep(50);
+ }
+
+ return false;
+ }
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/RecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/RecoveryHandshakeTest.java
index f2b5456..42c6e7e 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/RecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/RecoveryHandshakeTest.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.network.recovery;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import io.netty.channel.Channel;
@@ -45,8 +47,10 @@ import static org.apache.ignite.internal.network.recovery.RecoveryHandshakeTest.
import static org.apache.ignite.internal.network.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
import static org.apache.ignite.internal.network.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
import static org.apache.ignite.internal.network.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Recovery protocol handshake tests.
@@ -139,6 +143,46 @@ public class RecoveryHandshakeTest {
}
/**
+ * Tests that one server can handle multiple incoming connections and perform the handshake operation successfully.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testHandshakeWithMultipleClients() throws Exception {
+ ConnectionManager server = startManager(4000);
+
+ List<ConnectionManager> clients = new ArrayList<>();
+
+ int clientCount = 10;
+
+ for (int i = 0; i < clientCount; i++)
+ clients.add(startManager(4001 + i));
+
+ // A key is the client's consistent id, a value is the channel between the client and the server
+ var channelsToServer = new HashMap<String, NettySender>();
+
+ for (ConnectionManager client : clients) {
+ channelsToServer.put(
+ client.consistentId(),
+ client.channel(server.consistentId(), server.getLocalAddress()).get(3, TimeUnit.SECONDS)
+ );
+ }
+
+ assertTrue(waitForCondition(() -> server.channels().size() == clientCount, TimeUnit.SECONDS.toMillis(3)));
+
+ Map<String, NettySender> channels = server.channels();
+
+ // Assert that server's channels are inbound connections opened from clients
+ channelsToServer.forEach((consistentId, toServer) -> {
+ NettySender toClient = channels.get(consistentId);
+
+ assertNotNull(toClient);
+
+ assertEquals(toServer.channel().localAddress(), toClient.channel().remoteAddress());
+ });
+ }
+
+ /**
* @return Generates handshake scenarios.
*/
private static List<HandshakeScenario> handshakeScenarios() {
@@ -310,7 +354,7 @@ public class RecoveryHandshakeTest {
}
/**
- * Create and start a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
+ * Starts a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
*
* @param port Port for the {@link ConnectionManager#server}.
* @param serverHandshakeFailAt At what stage to fail server handshake.
@@ -344,4 +388,32 @@ public class RecoveryHandshakeTest {
return manager;
}
+ /**
+ * Starts a {@link ConnectionManager} with a normal handshake manager adding it to the {@link #startedManagers} list.
+ *
+ * @param port Port.
+ * @return Connection manager
+ */
+ private ConnectionManager startManager(int port) {
+ var registry = new TestMessageSerializationRegistryImpl();
+
+ var messageFactory = new NetworkMessagesFactory();
+
+ UUID launchId = UUID.randomUUID();
+ String consistentId = UUID.randomUUID().toString();
+
+ var manager = new ConnectionManager(
+ port,
+ registry,
+ consistentId,
+ () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
+ () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory)
+ );
+
+ manager.start();
+
+ startedManagers.add(manager);
+
+ return manager;
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 1570026..ffed4ad 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -36,10 +36,10 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -102,7 +102,7 @@ public class ConnectionManager {
this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
this.server = new NettyServer(
port,
- serverHandshakeManagerFactory.get(),
+ serverHandshakeManagerFactory,
this::onNewIncomingChannel,
this::onMessage,
serializationRegistry
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index ba961fa..600c396 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -69,7 +70,7 @@ public class NettyServer {
private final BiConsumer<SocketAddress, NetworkMessage> messageListener;
/** Handshake manager. */
- private final HandshakeManager handshakeManager;
+ private final Supplier<HandshakeManager> handshakeManager;
/** Server start future. */
private CompletableFuture<Void> serverStartFuture;
@@ -94,14 +95,14 @@ public class NettyServer {
* Constructor.
*
* @param port Server port.
- * @param handshakeManager Handshake manager.
+ * @param handshakeManager Handshake manager supplier.
* @param newConnectionListener New connections listener.
* @param messageListener Message listener.
* @param serializationRegistry Serialization registry.
*/
public NettyServer(
int port,
- HandshakeManager handshakeManager,
+ Supplier<HandshakeManager> handshakeManager,
Consumer<NettySender> newConnectionListener,
BiConsumer<SocketAddress, NetworkMessage> messageListener,
MessageSerializationRegistry serializationRegistry
@@ -114,7 +115,7 @@ public class NettyServer {
*
* @param bootstrap Server bootstrap.
* @param port Server port.
- * @param handshakeManager Handshake manager.
+ * @param handshakeManager Handshake manager supplier.
* @param newConnectionListener New connections listener.
* @param messageListener Message listener.
* @param serializationRegistry Serialization registry.
@@ -122,7 +123,7 @@ public class NettyServer {
public NettyServer(
ServerBootstrap bootstrap,
int port,
- HandshakeManager handshakeManager,
+ Supplier<HandshakeManager> handshakeManager,
Consumer<NettySender> newConnectionListener,
BiConsumer<SocketAddress, NetworkMessage> messageListener,
MessageSerializationRegistry serializationRegistry
@@ -153,6 +154,9 @@ public class NettyServer {
.childHandler(new ChannelInitializer<SocketChannel>() {
/** {@inheritDoc} */
@Override public void initChannel(SocketChannel ch) {
+ // Get handshake manager for the new channel.
+ HandshakeManager manager = handshakeManager.get();
+
ch.pipeline().addLast(
/*
* Decoder that uses the MessageReader
@@ -160,7 +164,7 @@ public class NettyServer {
*/
new InboundDecoder(serializationRegistry),
// Handshake handler.
- new HandshakeHandler(handshakeManager),
+ new HandshakeHandler(manager),
// Handles decoded NetworkMessages.
new MessageHandler(messageListener),
/*
@@ -173,7 +177,7 @@ public class NettyServer {
new IoExceptionSuppressingHandler()
);
- handshakeManager.handshakeFuture().thenAccept(newConnectionListener);
+ manager.handshakeFuture().thenAccept(newConnectionListener);
}
})
/*
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index dbef451..6140cd5 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -28,10 +28,10 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.internal.network.handshake.HandshakeAction;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index 74fb72d..c2c99af 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -32,10 +32,10 @@ import io.netty.channel.ServerChannel;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.internal.network.handshake.HandshakeAction;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageDeserializer;
import org.apache.ignite.network.serialization.MessageMappingException;
import org.apache.ignite.network.serialization.MessageReader;
@@ -201,7 +201,7 @@ public class NettyServerTest {
}
});
- server = new NettyServer(4000, handshakeManager, sender -> {}, (socketAddress, message) -> {}, registry);
+ server = new NettyServer(4000, () -> handshakeManager, sender -> {}, (socketAddress, message) -> {}, registry);
server.start().get(3, TimeUnit.SECONDS);
@@ -258,7 +258,7 @@ public class NettyServerTest {
Mockito.doReturn(future).when(bootstrap).bind(Mockito.anyInt());
- var server = new NettyServer(bootstrap, 0, mock(HandshakeManager.class), null, null, null);
+ var server = new NettyServer(bootstrap, 0, () -> mock(HandshakeManager.class), null, null, null);
try {
server.start().get(3, TimeUnit.SECONDS);