You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/21 18:49:13 UTC

[flink] branch release-1.16 updated: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new ecede7ab8ed [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
ecede7ab8ed is described below

commit ecede7ab8ed5fa5be667a9801a8a2ee5ead4a043
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 15:57:58 2022 +0800

    [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
---
 .../CreditBasedPartitionRequestClientHandler.java  | 22 +++----
 .../netty/PartitionRequestClientFactoryTest.java   | 76 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 00d8e6e03f8..fe008c1afed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -119,19 +119,15 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        // Unexpected close. In normal operation, the client closes the connection after all input
-        // channels have been removed. This indicates a problem with the remote task manager.
-        if (!inputChannels.isEmpty()) {
-            final SocketAddress remoteAddr = ctx.channel().remoteAddress();
-
-            notifyAllChannelsOfErrorAndClose(
-                    new RemoteTransportException(
-                            "Connection unexpectedly closed by remote task manager '"
-                                    + remoteAddr
-                                    + "'. "
-                                    + "This might indicate that the remote task manager was lost.",
-                            remoteAddr));
-        }
+        final SocketAddress remoteAddr = ctx.channel().remoteAddress();
+
+        notifyAllChannelsOfErrorAndClose(
+                new RemoteTransportException(
+                        "Connection unexpectedly closed by remote task manager '"
+                                + remoteAddr
+                                + "'. "
+                                + "This might indicate that the remote task manager was lost.",
+                        remoteAddr));
 
         super.channelInactive(ctx);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index d2b57c5409d..968b824d541 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -23,9 +23,12 @@ import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,6 +46,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -158,6 +164,56 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
         assertTrue(set.size() <= maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the channel and there is no
+     * input channel.
+     */
+    @Test
+    public void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+        CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
+        CompletableFuture<Channel> serverChannelFuture = new CompletableFuture<>();
+        NettyProtocol protocol =
+                new NettyProtocol(null, null) {
+                    @Override
+                    public ChannelHandler[] getServerChannelHandlers() {
+                        return new ChannelHandler[] {
+                            // Close on read
+                            new ChannelInboundHandlerAdapter() {
+                                @Override
+                                public void channelRegistered(ChannelHandlerContext ctx)
+                                        throws Exception {
+                                    super.channelRegistered(ctx);
+                                    serverChannelFuture.complete(ctx.channel());
+                                }
+                            }
+                        };
+                    }
+
+                    @Override
+                    public ChannelHandler[] getClientChannelHandlers() {
+                        return new ChannelHandler[] {
+                            new ChannelInactiveFutureHandler(inactiveFuture)
+                        };
+                    }
+                };
+        NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+        PartitionRequestClientFactory factory =
+                new PartitionRequestClientFactory(
+                        serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+        ConnectionID connectionID = serverAndClient.getConnectionID(0);
+        NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+        // close server channel
+        Channel channel = serverChannelFuture.get();
+        channel.close();
+        inactiveFuture.get();
+        NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
+        assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
+        shutdown(serverAndClient);
+    }
+
     @Test
     public void testNettyClientConnectRetry() throws Exception {
         NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
@@ -329,4 +385,24 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
             }
         }
     }
+
+    private static class ChannelInactiveFutureHandler
+            extends CreditBasedPartitionRequestClientHandler {
+
+        private final CompletableFuture<Void> inactiveFuture;
+
+        private ChannelInactiveFutureHandler(CompletableFuture<Void> inactiveFuture) {
+            this.inactiveFuture = inactiveFuture;
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            super.channelInactive(ctx);
+            inactiveFuture.complete(null);
+        }
+
+        public CompletableFuture<Void> getInactiveFuture() {
+            return inactiveFuture;
+        }
+    }
 }