You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/20 08:13:27 UTC
[GitHub] [flink] 1996fanrui opened a new pull request, #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
1996fanrui opened a new pull request, #21351:
URL: https://github.com/apache/flink/pull/21351
## What is the purpose of the change
Fix the bug of old netty client isn't closed when netty server closes channel and no input channel.
`CreditBasedPartitionRequestClientHandler#channelInactive` doesn't call the `notifyAllChannelsOfErrorAndClose` when the `inputChannels.isEmpty()`.
https://github.com/apache/flink/blob/d745f5b3f7a64445854c735668afa9b72edb3fee/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java#L126
![image](https://user-images.githubusercontent.com/38427477/202892039-7692b054-e502-4b3f-9de6-4f67c6b04f6f.png)
FLINK-28695 enables TCP connection reuse across multiple jobs. That means `CreditBasedPartitionRequestClientHandler#channelInactive` doesn't call the `notifyAllChannelsOfErrorAndClose` when after old job stop and before new job start. If the tcp connection is disconnected during this period, the new job should create a new tcp connection. However, the `PartitionRequestClientFactory` doesn't know that the tcp connection is broken, so the new job reuses the failed connection.
## Brief change log
Close the old netty client when netty server closes channel and no input channel
## Verifying this change
This change added tests and can be verified as follows:
- *PartitionRequestClientFactoryTest#testConnectionReuseWhenRemoteCloseAndNoInputChannel*
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not documented
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027398763
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +166,58 @@ private void checkReuseNettyPartitionRequestClient(
assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
}
+ /**
+ * Verify that the netty client reuse when the netty server closes the channel and there is no
+ * input channel.
+ */
+ @TestTemplate
+ void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+ CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
+ NettyProtocol protocol =
+ new NettyProtocol(null, null) {
+
+ @Override
+ public ChannelHandler[] getServerChannelHandlers() {
+ return new ChannelHandler[] {
+ // Close on read
+ new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ctx.channel().close();
+ }
+ }
+ };
+ }
+
+ @Override
+ public ChannelHandler[] getClientChannelHandlers() {
+ return new ChannelHandler[] {
+ new ChannelInactiveFutureHandler(inactiveFuture)
+ };
+ }
+ };
+ NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+ PartitionRequestClientFactory factory =
+ new PartitionRequestClientFactory(
+ serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+ ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
+ NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+ assertThat(oldClient.hasError()).isFalse();
+
+ // Write something to trigger close by server
+ oldClient.writeAndFlush(Unpooled.buffer().writerIndex(16));
+
+ inactiveFuture.get();
+ assertThat(oldClient.hasError()).isTrue();
+
+ NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
+ assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
+ shutdown(serverAndClient);
Review Comment:
Thanks for your great suggestion, it makes sense, updated.
BTW, I removed the `assertThat(serverChannelFuture).isDone();` because channelRegistered is not called synchronously, unit test sometimes fails.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21351:
URL: https://github.com/apache/flink/pull/21351#issuecomment-1321861598
Thanks @reswqa and @pnowojski review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21351:
URL: https://github.com/apache/flink/pull/21351#issuecomment-1321165059
> @1996fanrui Thanks for your contribution, I just left some comments, PTAL~
Hi @reswqa , thanks a lot for your review, especially during weekends.
I have updated all comments.
> These two @VisibleForTesting methods should probably be avoided. For the connection reuse scenario, we are doing black box testing and should rely less on internal state to make assertions. I suggest that we consider another approach to trigger the close of the serverHandler.
I use the `void writeAndFlush(Object msg)` and `boolean hasError()` instead of `Channel getTcpChannel()` and `NetworkClientHandler getClientHandler()`. In contrast, this solution exposes the interface rather than internal details.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21351:
URL: https://github.com/apache/flink/pull/21351#issuecomment-1321124984
@flinkbot run azure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027297678
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +171,58 @@ private void checkReuseNettyPartitionRequestClient(
assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
}
+ /**
+ * Verify that the netty client reuse when the netty server closes the channel and there is no
+ * input channel.
+ */
+ @TestTemplate
+ void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+ NettyProtocol protocol =
+ new NettyProtocol(
+ mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)) {
Review Comment:
We should not introduce mock into the test class. Please refer to [Avoid Mockito](https://flink.apache.org/contributing/code-style-and-quality-common.html).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java:
##########
@@ -277,6 +270,16 @@ private boolean canBeReused() {
return clientFactory.isConnectionReuseEnabled() && !clientHandler.hasChannelError();
}
+ @VisibleForTesting
+ public Channel getTcpChannel() {
Review Comment:
I think we can get the handler directly through the channel(via `channel.pipeline().get(NetworkClientHandler.class`), so we don't need to introduce the next method.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java:
##########
@@ -277,6 +270,16 @@ private boolean canBeReused() {
return clientFactory.isConnectionReuseEnabled() && !clientHandler.hasChannelError();
}
+ @VisibleForTesting
+ public Channel getTcpChannel() {
+ return tcpChannel;
+ }
+
+ @VisibleForTesting
Review Comment:
These two `@VisibleForTesting` methods should probably be avoided. For the connection reuse scenario, we are doing black box testing and should rely less on internal state to make assertions. I suggest that we consider another approach to trigger the close of the serverHandler.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +171,58 @@ private void checkReuseNettyPartitionRequestClient(
assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
}
+ /**
+ * Verify that the netty client reuse when the netty server closes the channel and there is no
+ * input channel.
+ */
+ @TestTemplate
+ void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+ NettyProtocol protocol =
+ new NettyProtocol(
+ mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)) {
+
+ @Override
+ public ChannelHandler[] getServerChannelHandlers() {
+ return new ChannelHandler[] {
+ // Close on read
+ new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ctx.channel().close();
+ }
+ }
+ };
+ }
+
+ @Override
+ public ChannelHandler[] getClientChannelHandlers() {
+ return new ChannelHandler[] {new ChannelInactiveFutureHandler()};
+ }
+ };
+ NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+ PartitionRequestClientFactory factory =
+ new PartitionRequestClientFactory(
+ serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+ ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
+ NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+ ChannelInactiveFutureHandler clientHandler =
+ (ChannelInactiveFutureHandler) oldClient.getClientHandler();
+ assertFalse(clientHandler.hasChannelError());
Review Comment:
Please use assertj instead of junit assertion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] pnowojski merged pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
Posted by GitBox <gi...@apache.org>.
pnowojski merged PR #21351:
URL: https://github.com/apache/flink/pull/21351
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21351:
URL: https://github.com/apache/flink/pull/21351#issuecomment-1321067923
<!--
Meta data
{
"version" : 1,
"metaDataEntries" : [ {
"hash" : "0a43817beddb55953e9457da28315b235a8ffdae",
"status" : "UNKNOWN",
"url" : "TBD",
"triggerID" : "0a43817beddb55953e9457da28315b235a8ffdae",
"triggerType" : "PUSH"
} ]
}-->
## CI report:
* 0a43817beddb55953e9457da28315b235a8ffdae UNKNOWN
<details>
<summary>Bot commands</summary>
The @flinkbot bot supports the following commands:
- `@flinkbot run azure` re-run the last Azure build
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #21351: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21351:
URL: https://github.com/apache/flink/pull/21351#discussion_r1027320357
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java:
##########
@@ -164,6 +166,58 @@ private void checkReuseNettyPartitionRequestClient(
assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
}
+ /**
+ * Verify that the netty client reuse when the netty server closes the channel and there is no
+ * input channel.
+ */
+ @TestTemplate
+ void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+ CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
+ NettyProtocol protocol =
+ new NettyProtocol(null, null) {
+
+ @Override
+ public ChannelHandler[] getServerChannelHandlers() {
+ return new ChannelHandler[] {
+ // Close on read
+ new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ctx.channel().close();
+ }
+ }
+ };
+ }
+
+ @Override
+ public ChannelHandler[] getClientChannelHandlers() {
+ return new ChannelHandler[] {
+ new ChannelInactiveFutureHandler(inactiveFuture)
+ };
+ }
+ };
+ NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+ PartitionRequestClientFactory factory =
+ new PartitionRequestClientFactory(
+ serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+ ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
+ NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+ assertThat(oldClient.hasError()).isFalse();
+
+ // Write something to trigger close by server
+ oldClient.writeAndFlush(Unpooled.buffer().writerIndex(16));
+
+ inactiveFuture.get();
+ assertThat(oldClient.hasError()).isTrue();
+
+ NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
+ assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
+ shutdown(serverAndClient);
Review Comment:
Since `writeAndFlush` is only used to send messages to the `ServerHandler` to trigger channel close, I don't think it is necessary to introduce `writeAndFlush `. This is a change in my mind(Maybe not the best way, but just an example):
```suggestion
CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
CompletableFuture<Channel> serverChannelFuture = new CompletableFuture<>();
NettyProtocol protocol =
new NettyProtocol(null, null) {
@Override
public ChannelHandler[] getServerChannelHandlers() {
return new ChannelHandler[] {
// Close on read
new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
super.channelRegistered(ctx);
serverChannelFuture.complete(ctx.channel());
}
}
};
}
@Override
public ChannelHandler[] getClientChannelHandlers() {
return new ChannelHandler[] {
new ChannelInactiveFutureHandler(inactiveFuture)
};
}
};
NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
PartitionRequestClientFactory factory =
new PartitionRequestClientFactory(
serverAndClient.client(), 2, 1, connectionReuseEnabled);
ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
// close server channel
assertThat(serverChannelFuture).isDone();
Channel channel = serverChannelFuture.get();
channel.close();
inactiveFuture.get();
NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
shutdown(serverAndClient);
```
1. No longer rely on the client to send messages to close the server channel.
2. Is the assertion of `hasError` really necessary? Our test is just to verify that `NettyPartitionRequestClient` is different.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org