You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "He-Pin (via GitHub)" <gi...@apache.org> on 2023/08/01 10:01:24 UTC

[GitHub] [incubator-pekko] He-Pin commented on a diff in pull request #486: Migrate multi node testkit to Netty 4.

He-Pin commented on code in PR #486:
URL: https://github.com/apache/incubator-pekko/pull/486#discussion_r1280396169


##########
multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala:
##########
@@ -94,44 +90,72 @@ private[pekko] case object Client extends Role
  */
 private[pekko] case object Server extends Role
 
+/**
+ * INTERNAL API.
+ */
+private[pekko] trait RemoteConnection {
+  def channel: Channel
+  def shutdown(): Unit
+}
+
 /**
  * INTERNAL API.
  */
 private[pekko] object RemoteConnection {
-  def apply(role: Role, sockaddr: InetSocketAddress, poolSize: Int, handler: ChannelUpstreamHandler): Channel = {
+  def apply(
+      role: Role,
+      sockaddr: InetSocketAddress,
+      poolSize: Int,
+      handler: ChannelInboundHandler): RemoteConnection = {
     role match {
       case Client =>
-        val socketfactory =
-          new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool, poolSize)
-        val bootstrap = new ClientBootstrap(socketfactory)
-        bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
-        bootstrap.setOption("tcpNoDelay", true)
-        bootstrap.connect(sockaddr).getChannel
+        val bootstrap = new Bootstrap()
+        val eventLoopGroup = new NioEventLoopGroup(poolSize)
+        val clientChannel = bootstrap
+          .group(eventLoopGroup)
+          .channel(classOf[NioSocketChannel])
+          .handler(new TestConductorPipelineFactory(handler))
+          .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
+          .connect(sockaddr)
+          .sync()
+          .channel()
+        new RemoteConnection {
+          override def channel: Channel = clientChannel
+          override def shutdown(): Unit = {
+            clientChannel.close().sync()
+            eventLoopGroup.shutdownGracefully()
+            eventLoopGroup.terminationFuture().sync()
+          }
+        }
+
       case Server =>
-        val socketfactory =
-          new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool, poolSize)
-        val bootstrap = new ServerBootstrap(socketfactory)
-        bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
-        bootstrap.setOption("reuseAddress", !Helpers.isWindows)
-        bootstrap.setOption("child.tcpNoDelay", true)
-        bootstrap.bind(sockaddr)
+        val bootstrap = new ServerBootstrap()
+        val parentEventLoopGroup = new NioEventLoopGroup(poolSize)
+        val childEventLoopGroup = new NioEventLoopGroup(poolSize)
+        val serverChannel = bootstrap
+          .group(parentEventLoopGroup, childEventLoopGroup)
+          .channel(classOf[NioServerSocketChannel])
+          .childHandler(new TestConductorPipelineFactory(handler))
+          .option[java.lang.Boolean](ChannelOption.SO_REUSEADDR, !Helpers.isWindows)
+          .childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
+          .bind(sockaddr)
+          .sync()
+          .channel()

Review Comment:
   I was expected the channel is ready to be connected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org