You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/21 18:49:13 UTC
[flink] branch release-1.16 updated: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new ecede7ab8ed [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
ecede7ab8ed is described below
commit ecede7ab8ed5fa5be667a9801a8a2ee5ead4a043
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 15:57:58 2022 +0800
[FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
---
.../CreditBasedPartitionRequestClientHandler.java | 22 +++----
.../netty/PartitionRequestClientFactoryTest.java | 76 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 13 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 00d8e6e03f8..fe008c1afed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -119,19 +119,15 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- // Unexpected close. In normal operation, the client closes the connection after all input
- // channels have been removed. This indicates a problem with the remote task manager.
- if (!inputChannels.isEmpty()) {
- final SocketAddress remoteAddr = ctx.channel().remoteAddress();
-
- notifyAllChannelsOfErrorAndClose(
- new RemoteTransportException(
- "Connection unexpectedly closed by remote task manager '"
- + remoteAddr
- + "'. "
- + "This might indicate that the remote task manager was lost.",
- remoteAddr));
- }
+ final SocketAddress remoteAddr = ctx.channel().remoteAddress();
+
+ notifyAllChannelsOfErrorAndClose(
+ new RemoteTransportException(
+ "Connection unexpectedly closed by remote task manager '"
+ + remoteAddr
+ + "'. "
+ + "This might indicate that the remote task manager was lost.",
+ remoteAddr));
super.channelInactive(ctx);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index d2b57c5409d..968b824d541 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -23,9 +23,12 @@ import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -43,6 +46,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -158,6 +164,56 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
assertTrue(set.size() <= maxNumberOfConnections);
}
+ /**
+ * Verify that the netty client reuse when the netty server closes the channel and there is no
+ * input channel.
+ */
+ @Test
+ public void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+ CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
+ CompletableFuture<Channel> serverChannelFuture = new CompletableFuture<>();
+ NettyProtocol protocol =
+ new NettyProtocol(null, null) {
+ @Override
+ public ChannelHandler[] getServerChannelHandlers() {
+ return new ChannelHandler[] {
+ // Close on read
+ new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx)
+ throws Exception {
+ super.channelRegistered(ctx);
+ serverChannelFuture.complete(ctx.channel());
+ }
+ }
+ };
+ }
+
+ @Override
+ public ChannelHandler[] getClientChannelHandlers() {
+ return new ChannelHandler[] {
+ new ChannelInactiveFutureHandler(inactiveFuture)
+ };
+ }
+ };
+ NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+ PartitionRequestClientFactory factory =
+ new PartitionRequestClientFactory(
+ serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+ ConnectionID connectionID = serverAndClient.getConnectionID(0);
+ NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+ // close server channel
+ Channel channel = serverChannelFuture.get();
+ channel.close();
+ inactiveFuture.get();
+ NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
+ assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
+ shutdown(serverAndClient);
+ }
+
@Test
public void testNettyClientConnectRetry() throws Exception {
NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
@@ -329,4 +385,24 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
}
}
}
+
+ private static class ChannelInactiveFutureHandler
+ extends CreditBasedPartitionRequestClientHandler {
+
+ private final CompletableFuture<Void> inactiveFuture;
+
+ private ChannelInactiveFutureHandler(CompletableFuture<Void> inactiveFuture) {
+ this.inactiveFuture = inactiveFuture;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ inactiveFuture.complete(null);
+ }
+
+ public CompletableFuture<Void> getInactiveFuture() {
+ return inactiveFuture;
+ }
+ }
}