You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/21 01:30:04 UTC

[GitHub] [flink] 1996fanrui commented on a diff in pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

1996fanrui commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027398763


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +166,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+        CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
+        NettyProtocol protocol =
+                new NettyProtocol(null, null) {
+
+                    @Override
+                    public ChannelHandler[] getServerChannelHandlers() {
+                        return new ChannelHandler[] {
+                            // Close on read
+                            new ChannelInboundHandlerAdapter() {
+                                @Override
+                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
+                                    ctx.channel().close();
+                                }
+                            }
+                        };
+                    }
+
+                    @Override
+                    public ChannelHandler[] getClientChannelHandlers() {
+                        return new ChannelHandler[] {
+                            new ChannelInactiveFutureHandler(inactiveFuture)
+                        };
+                    }
+                };
+        NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+        PartitionRequestClientFactory factory =
+                new PartitionRequestClientFactory(
+                        serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+        ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
+        NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+        assertThat(oldClient.hasError()).isFalse();
+
+        // Write something to trigger close by server
+        oldClient.writeAndFlush(Unpooled.buffer().writerIndex(16));
+
+        inactiveFuture.get();
+        assertThat(oldClient.hasError()).isTrue();
+
+        NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
+        assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
+        shutdown(serverAndClient);

Review Comment:
   Thanks for your great suggestion, it makes sense, updated.
   
   BTW, I removed the `assertThat(serverChannelFuture).isDone();` because channelRegistered is not called synchronously, unit test sometimes fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org