You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/20 14:32:48 UTC

[GitHub] [flink] reswqa commented on a diff in pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

reswqa commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027297678


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +171,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+        NettyProtocol protocol =
+                new NettyProtocol(
+                        mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)) {

Review Comment:
   We should not introduce mock into the test class. Please refer to [Avoid Mockito](https://flink.apache.org/contributing/code-style-and-quality-common.html).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java:
##########
@@ -277,6 +270,16 @@ private boolean canBeReused() {
         return clientFactory.isConnectionReuseEnabled() && !clientHandler.hasChannelError();
     }
 
+    @VisibleForTesting
+    public Channel getTcpChannel() {

Review Comment:
   I think we can get the handler directly through the channel(via `channel.pipeline().get(NetworkClientHandler.class`), so we don't need to introduce the next method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java:
##########
@@ -277,6 +270,16 @@ private boolean canBeReused() {
         return clientFactory.isConnectionReuseEnabled() && !clientHandler.hasChannelError();
     }
 
+    @VisibleForTesting
+    public Channel getTcpChannel() {
+        return tcpChannel;
+    }
+
+    @VisibleForTesting

Review Comment:
   These two `@VisibleForTesting` methods should probably be avoided. For the connection reuse scenario, we are doing black box testing and should rely less on internal state to make assertions. I suggest that we consider another approach to trigger the close of the serverHandler.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +171,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+        NettyProtocol protocol =
+                new NettyProtocol(
+                        mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)) {
+
+                    @Override
+                    public ChannelHandler[] getServerChannelHandlers() {
+                        return new ChannelHandler[] {
+                            // Close on read
+                            new ChannelInboundHandlerAdapter() {
+                                @Override
+                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
+                                    ctx.channel().close();
+                                }
+                            }
+                        };
+                    }
+
+                    @Override
+                    public ChannelHandler[] getClientChannelHandlers() {
+                        return new ChannelHandler[] {new ChannelInactiveFutureHandler()};
+                    }
+                };
+        NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+        PartitionRequestClientFactory factory =
+                new PartitionRequestClientFactory(
+                        serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+        ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
+        NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+        ChannelInactiveFutureHandler clientHandler =
+                (ChannelInactiveFutureHandler) oldClient.getClientHandler();
+        assertFalse(clientHandler.hasChannelError());

Review Comment:
   Please use assertj instead of junit assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org