You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/20 08:13:27 UTC

[GitHub] [flink] 1996fanrui opened a new pull request, #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

1996fanrui opened a new pull request, #21351:
URL: https://github.com/apache/flink/pull/21351

   ## What is the purpose of the change
   
   Fix the bug of old netty client isn't closed when netty server closes channel and no input channel.
   
   `CreditBasedPartitionRequestClientHandler#channelInactive`  doesn't call the `notifyAllChannelsOfErrorAndClose` when  the `inputChannels.isEmpty()`.
   
   https://github.com/apache/flink/blob/d745f5b3f7a64445854c735668afa9b72edb3fee/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java#L126
   
   ![image](https://user-images.githubusercontent.com/38427477/202892039-7692b054-e502-4b3f-9de6-4f67c6b04f6f.png)
   
   
   FLINK-28695 enables TCP connection reuse across multiple jobs. That means `CreditBasedPartitionRequestClientHandler#channelInactive` doesn't call the `notifyAllChannelsOfErrorAndClose` when  after old job stop and before new job start. If the tcp connection is disconnected during this period, the new job should create a new tcp connection. However, the `PartitionRequestClientFactory` doesn't know that the tcp connection is broken, so the new job reuses the failed connection.
   
   
   ## Brief change log
   
   Close the old netty client when netty server closes channel and no input channel
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *PartitionRequestClientFactoryTest#testConnectionReuseWhenRemoteCloseAndNoInputChannel*
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027398763


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +166,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+        CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
+        NettyProtocol protocol =
+                new NettyProtocol(null, null) {
+
+                    @Override
+                    public ChannelHandler[] getServerChannelHandlers() {
+                        return new ChannelHandler[] {
+                            // Close on read
+                            new ChannelInboundHandlerAdapter() {
+                                @Override
+                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
+                                    ctx.channel().close();
+                                }
+                            }
+                        };
+                    }
+
+                    @Override
+                    public ChannelHandler[] getClientChannelHandlers() {
+                        return new ChannelHandler[] {
+                            new ChannelInactiveFutureHandler(inactiveFuture)
+                        };
+                    }
+                };
+        NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+        PartitionRequestClientFactory factory =
+                new PartitionRequestClientFactory(
+                        serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+        ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
+        NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+        assertThat(oldClient.hasError()).isFalse();
+
+        // Write something to trigger close by server
+        oldClient.writeAndFlush(Unpooled.buffer().writerIndex(16));
+
+        inactiveFuture.get();
+        assertThat(oldClient.hasError()).isTrue();
+
+        NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
+        assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
+        shutdown(serverAndClient);

Review Comment:
   Thanks for your great suggestion, it makes sense, updated.
   
   BTW, I removed the `assertThat(serverChannelFuture).isDone();` because channelRegistered is not called synchronously, unit test sometimes fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21351:
URL: https://github.com/apache/flink/pull/21351#issuecomment-1321861598

   Thanks @reswqa and @pnowojski  review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21351:
URL: https://github.com/apache/flink/pull/21351#issuecomment-1321165059

   > @1996fanrui Thanks for your contribution, I just left some comments, PTAL~
   
   Hi @reswqa , thanks a lot for your review, especially during weekends.
   
   I have updated all comments.
   
   > These two @VisibleForTesting methods should probably be avoided. For the connection reuse scenario, we are doing black box testing and should rely less on internal state to make assertions. I suggest that we consider another approach to trigger the close of the serverHandler.
   
   I use the `void writeAndFlush(Object msg)` and `boolean hasError()` instead of `Channel getTcpChannel()` and `NetworkClientHandler getClientHandler()`. In contrast, this solution exposes the interface rather than internal details. 
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21351:
URL: https://github.com/apache/flink/pull/21351#issuecomment-1321124984

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027297678


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +171,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+        NettyProtocol protocol =
+                new NettyProtocol(
+                        mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)) {

Review Comment:
   We should not introduce mock into the test class. Please refer to [Avoid Mockito](https://flink.apache.org/contributing/code-style-and-quality-common.html).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java:
##########
@@ -277,6 +270,16 @@ private boolean canBeReused() {
         return clientFactory.isConnectionReuseEnabled() && !clientHandler.hasChannelError();
     }
 
+    @VisibleForTesting
+    public Channel getTcpChannel() {

Review Comment:
   I think we can get the handler directly through the channel(via `channel.pipeline().get(NetworkClientHandler.class`), so we don't need to introduce the next method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java:
##########
@@ -277,6 +270,16 @@ private boolean canBeReused() {
         return clientFactory.isConnectionReuseEnabled() && !clientHandler.hasChannelError();
     }
 
+    @VisibleForTesting
+    public Channel getTcpChannel() {
+        return tcpChannel;
+    }
+
+    @VisibleForTesting

Review Comment:
   These two `@VisibleForTesting` methods should probably be avoided. For the connection reuse scenario, we are doing black box testing and should rely less on internal state to make assertions. I suggest that we consider another approach to trigger the close of the serverHandler.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +171,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+        NettyProtocol protocol =
+                new NettyProtocol(
+                        mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)) {
+
+                    @Override
+                    public ChannelHandler[] getServerChannelHandlers() {
+                        return new ChannelHandler[] {
+                            // Close on read
+                            new ChannelInboundHandlerAdapter() {
+                                @Override
+                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
+                                    ctx.channel().close();
+                                }
+                            }
+                        };
+                    }
+
+                    @Override
+                    public ChannelHandler[] getClientChannelHandlers() {
+                        return new ChannelHandler[] {new ChannelInactiveFutureHandler()};
+                    }
+                };
+        NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+        PartitionRequestClientFactory factory =
+                new PartitionRequestClientFactory(
+                        serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+        ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
+        NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+        ChannelInactiveFutureHandler clientHandler =
+                (ChannelInactiveFutureHandler) oldClient.getClientHandler();
+        assertFalse(clientHandler.hasChannelError());

Review Comment:
   Please use assertj instead of junit assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski merged pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

Posted by GitBox <gi...@apache.org>.
pnowojski merged PR #21351:
URL: https://github.com/apache/flink/pull/21351


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21351:
URL: https://github.com/apache/flink/pull/21351#issuecomment-1321067923

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0a43817beddb55953e9457da28315b235a8ffdae",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0a43817beddb55953e9457da28315b235a8ffdae",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0a43817beddb55953e9457da28315b235a8ffdae UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027320357


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +166,58 @@ private void checkReuseNettyPartitionRequestClient(
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+        CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
+        NettyProtocol protocol =
+                new NettyProtocol(null, null) {
+
+                    @Override
+                    public ChannelHandler[] getServerChannelHandlers() {
+                        return new ChannelHandler[] {
+                            // Close on read
+                            new ChannelInboundHandlerAdapter() {
+                                @Override
+                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
+                                    ctx.channel().close();
+                                }
+                            }
+                        };
+                    }
+
+                    @Override
+                    public ChannelHandler[] getClientChannelHandlers() {
+                        return new ChannelHandler[] {
+                            new ChannelInactiveFutureHandler(inactiveFuture)
+                        };
+                    }
+                };
+        NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+        PartitionRequestClientFactory factory =
+                new PartitionRequestClientFactory(
+                        serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+        ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
+        NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+        assertThat(oldClient.hasError()).isFalse();
+
+        // Write something to trigger close by server
+        oldClient.writeAndFlush(Unpooled.buffer().writerIndex(16));
+
+        inactiveFuture.get();
+        assertThat(oldClient.hasError()).isTrue();
+
+        NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
+        assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
+        shutdown(serverAndClient);

Review Comment:
   Since `writeAndFlush` is only used to send messages to the `ServerHandler` to trigger channel close, I don't think it is necessary to introduce `writeAndFlush `. This is a change in my mind(Maybe not the best way, but just an example):
   ```suggestion
           CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
           CompletableFuture<Channel> serverChannelFuture = new CompletableFuture<>();
   
           NettyProtocol protocol =
                   new NettyProtocol(null, null) {
   
                       @Override
                       public ChannelHandler[] getServerChannelHandlers() {
                           return new ChannelHandler[] {
                               // Close on read
                               new ChannelInboundHandlerAdapter() {
                                   @Override
                                   public void channelRegistered(ChannelHandlerContext ctx)
                                           throws Exception {
                                       super.channelRegistered(ctx);
                                       serverChannelFuture.complete(ctx.channel());
                                   }
                               }
                           };
                       }
   
                       @Override
                       public ChannelHandler[] getClientChannelHandlers() {
                           return new ChannelHandler[] {
                               new ChannelInactiveFutureHandler(inactiveFuture)
                           };
                       }
                   };
           NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
   
           PartitionRequestClientFactory factory =
                   new PartitionRequestClientFactory(
                           serverAndClient.client(), 2, 1, connectionReuseEnabled);
   
           ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
           NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
           
           // close server channel
           assertThat(serverChannelFuture).isDone();
           Channel channel = serverChannelFuture.get();
           channel.close();
   
           inactiveFuture.get();
   
           NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
           assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
           shutdown(serverAndClient);
   ```
   1. No longer rely on the client to send messages to close the server channel.
   2. Is the assertion of `hasError` really necessary? Our test is just to verify that `NettyPartitionRequestClient` is different.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org