You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/11/22 18:23:35 UTC
[ignite-3] branch ignite-15307 updated: Fixing NettyServerTest
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch ignite-15307
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-15307 by this push:
new 90a2792 Fixing NettyServerTest
90a2792 is described below
commit 90a27927a0a9ec95cd40dd0c8fb73053ca08ca19
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Nov 22 21:23:25 2021 +0300
Fixing NettyServerTest
---
.../ignite/internal/network/netty/NettyServer.java | 55 +++++-----------------
.../internal/network/netty/NettyServerTest.java | 15 +++---
2 files changed, 20 insertions(+), 50 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index d411294..91ceb0f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -23,7 +23,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
@@ -48,8 +47,8 @@ public class NettyServer {
/** A lock for start and stop operations. */
private final Object startStopLock = new Object();
- /** {@link NioServerSocketChannel} bootstrapper. */
- private final ServerBootstrap bootstrap;
+ /** Bootstrap factory. */
+ private final NettyBootstrapFactory bootstrapFactory;
/** Server socket configuration. */
private final NetworkView configuration;
@@ -98,43 +97,12 @@ public class NettyServer {
MessageSerializationRegistry serializationRegistry,
NettyBootstrapFactory bootstrapFactory
) {
- this(
- consistentId,
- bootstrapFactory.createServerBootstrap(),
- configuration,
- handshakeManager,
- newConnectionListener,
- messageListener,
- serializationRegistry
- );
- }
-
- /**
- * Constructor.
- *
- * @param consistentId Consistent id.
- * @param bootstrap Server bootstrap.
- * @param configuration Server configuration.
- * @param handshakeManager Handshake manager supplier.
- * @param newConnectionListener New connections listener.
- * @param messageListener Message listener.
- * @param serializationRegistry Serialization registry.
- */
- public NettyServer(
- String consistentId,
- ServerBootstrap bootstrap,
- NetworkView configuration,
- Supplier<HandshakeManager> handshakeManager,
- Consumer<NettySender> newConnectionListener,
- BiConsumer<SocketAddress, NetworkMessage> messageListener,
- MessageSerializationRegistry serializationRegistry
- ) {
- this.bootstrap = bootstrap;
this.configuration = configuration;
this.handshakeManager = handshakeManager;
this.newConnectionListener = newConnectionListener;
this.messageListener = messageListener;
this.serializationRegistry = serializationRegistry;
+ this.bootstrapFactory = bootstrapFactory;
}
/**
@@ -151,6 +119,8 @@ public class NettyServer {
if (serverStartFuture != null) {
throw new IgniteInternalException("Attempted to start an already started server");
}
+
+ ServerBootstrap bootstrap = bootstrapFactory.createServerBootstrap();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
/** {@inheritDoc} */
@@ -188,7 +158,7 @@ public class NettyServer {
var bindFuture = new CompletableFuture<Channel>();
- tryBind(port, port + portRange, bindFuture);
+ tryBind(bootstrap, port, port + portRange, bindFuture);
serverStartFuture = bindFuture
.handle((channel, err) -> {
@@ -217,11 +187,12 @@ public class NettyServer {
/**
* Try bind this server to a port.
*
- * @param port Target port.
- * @param endPort Last port that server can be bound to.
- * @param fut Future.
+ * @param bootstrap Bootstrap.
+ * @param port Target port.
+ * @param endPort Last port that server can be bound to.
+ * @param fut Future.
*/
- private void tryBind(int port, int endPort, CompletableFuture<Channel> fut) {
+ private void tryBind(ServerBootstrap bootstrap, int port, int endPort, CompletableFuture<Channel> fut) {
if (port > endPort) {
fut.completeExceptionally(new IllegalStateException("No available port in range"));
}
@@ -232,7 +203,7 @@ public class NettyServer {
} else if (future.isCancelled()) {
fut.cancel(true);
} else {
- tryBind(port + 1, endPort, fut);
+ tryBind(bootstrap,port + 1, endPort, fut);
}
});
}
@@ -268,7 +239,7 @@ public class NettyServer {
channel.close();
}
- return serverCloseFuture;
+ return serverCloseFuture == null ? CompletableFuture.<Void>completedFuture(null) : serverCloseFuture;
}).thenCompose(Function.identity());
}
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index a799f3d..2c93f49 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@@ -41,11 +40,13 @@ import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.network.NetworkView;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.network.handshake.HandshakeAction;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageDeserializer;
import org.apache.ignite.network.serialization.MessageMappingException;
@@ -210,7 +211,7 @@ public class NettyServerTest {
(socketAddress, message) -> {
},
registry,
- null
+ new NettyBootstrapFactory(serverCfg.value(), "")
);
server.start().get(3, TimeUnit.SECONDS);
@@ -267,19 +268,17 @@ public class NettyServerTest {
* @return NettyServer.
*/
private NettyServer getServer(ChannelFuture future, boolean shouldStart) {
- ServerBootstrap bootstrap = Mockito.spy(new ServerBootstrap());
-
- Mockito.doReturn(future).when(bootstrap).bind(Mockito.anyInt());
+ var bootstrapFactory = Mockito.spy(new NettyBootstrapFactory(Mockito.mock(NetworkView.class), ""));
var server = new NettyServer(
"test",
- bootstrap,
serverCfg.value(),
() -> mock(HandshakeManager.class),
null,
null,
- null
- );
+ null,
+ bootstrapFactory
+ );
try {
server.start().get(3, TimeUnit.SECONDS);