You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/21 10:45:07 UTC

[flink] branch master updated (8680caea9ad -> e7854193816)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 8680caea9ad [FLINK-30089][kinesis] Remove dependency promotion
     new 4b10f422f20 [FLINK-28695][hotfix][network] Remove some unused exceptions
     new d4a50a7467e [FLINK-28695][refactor][network] Refactor the nettyServerAndClient shutdown and some lambdas
     new e7854193816 [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../CreditBasedPartitionRequestClientHandler.java  |  30 +++---
 .../network/netty/NettyPartitionRequestClient.java |  98 +++++++++-----------
 .../netty/PartitionRequestClientFactoryTest.java   | 102 +++++++++++++++++----
 3 files changed, 143 insertions(+), 87 deletions(-)


[flink] 02/03: [FLINK-28695][refactor][network] Refactor the nettyServerAndClient shutdown and some lambdas

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4a50a7467ea64840c4a24f5291613d2f7a3e1d5
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 15:37:44 2022 +0800

    [FLINK-28695][refactor][network] Refactor the nettyServerAndClient shutdown and some lambdas
---
 .../network/netty/NettyPartitionRequestClient.java | 98 ++++++++++------------
 .../netty/PartitionRequestClientFactoryTest.java   | 26 +++---
 2 files changed, 55 insertions(+), 69 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
index 2bfa28b6040..cbd823c134c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
@@ -131,29 +131,26 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
                         inputChannel.getInitialCredit());
 
         final ChannelFutureListener listener =
-                new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) {
-                        if (!future.isSuccess()) {
-                            clientHandler.removeInputChannel(inputChannel);
-                            inputChannel.onError(
-                                    new LocalTransportException(
-                                            String.format(
-                                                    "Sending the partition request to '%s [%s] (#%d)' failed.",
-                                                    connectionId.getAddress(),
-                                                    connectionId
-                                                            .getResourceID()
-                                                            .getStringWithMetadata(),
-                                                    connectionId.getConnectionIndex()),
-                                            future.channel().localAddress(),
-                                            future.cause()));
-                            sendToChannel(
-                                    new ConnectionErrorMessage(
-                                            future.cause() == null
-                                                    ? new RuntimeException(
-                                                            "Cannot send partition request.")
-                                                    : future.cause()));
-                        }
+                future -> {
+                    if (!future.isSuccess()) {
+                        clientHandler.removeInputChannel(inputChannel);
+                        inputChannel.onError(
+                                new LocalTransportException(
+                                        String.format(
+                                                "Sending the partition request to '%s [%s] (#%d)' failed.",
+                                                connectionId.getAddress(),
+                                                connectionId
+                                                        .getResourceID()
+                                                        .getStringWithMetadata(),
+                                                connectionId.getConnectionIndex()),
+                                        future.channel().localAddress(),
+                                        future.cause()));
+                        sendToChannel(
+                                new ConnectionErrorMessage(
+                                        future.cause() == null
+                                                ? new RuntimeException(
+                                                        "Cannot send partition request.")
+                                                : future.cause()));
                     }
                 };
 
@@ -165,12 +162,9 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
             tcpChannel
                     .eventLoop()
                     .schedule(
-                            new Runnable() {
-                                @Override
-                                public void run() {
-                                    f[0] = tcpChannel.writeAndFlush(request);
-                                    f[0].addListener(listener);
-                                }
+                            () -> {
+                                f[0] = tcpChannel.writeAndFlush(request);
+                                f[0].addListener(listener);
                             },
                             delayMs,
                             TimeUnit.MILLISECONDS);
@@ -194,30 +188,28 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
                 .writeAndFlush(
                         new TaskEventRequest(event, partitionId, inputChannel.getInputChannelId()))
                 .addListener(
-                        new ChannelFutureListener() {
-                            @Override
-                            public void operationComplete(ChannelFuture future) {
-                                if (!future.isSuccess()) {
-                                    inputChannel.onError(
-                                            new LocalTransportException(
-                                                    String.format(
-                                                            "Sending the task event to '%s [%s] (#%d)' failed.",
-                                                            connectionId.getAddress(),
-                                                            connectionId
-                                                                    .getResourceID()
-                                                                    .getStringWithMetadata(),
-                                                            connectionId.getConnectionIndex()),
-                                                    future.channel().localAddress(),
-                                                    future.cause()));
-                                    sendToChannel(
-                                            new ConnectionErrorMessage(
-                                                    future.cause() == null
-                                                            ? new RuntimeException(
-                                                                    "Cannot send task event.")
-                                                            : future.cause()));
-                                }
-                            }
-                        });
+                        (ChannelFutureListener)
+                                future -> {
+                                    if (!future.isSuccess()) {
+                                        inputChannel.onError(
+                                                new LocalTransportException(
+                                                        String.format(
+                                                                "Sending the task event to '%s [%s] (#%d)' failed.",
+                                                                connectionId.getAddress(),
+                                                                connectionId
+                                                                        .getResourceID()
+                                                                        .getStringWithMetadata(),
+                                                                connectionId.getConnectionIndex()),
+                                                        future.channel().localAddress(),
+                                                        future.cause()));
+                                        sendToChannel(
+                                                new ConnectionErrorMessage(
+                                                        future.cause() == null
+                                                                ? new RuntimeException(
+                                                                        "Cannot send task event.")
+                                                                : future.cause()));
+                                    }
+                                });
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index f8ba8118a77..56110d95cbc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -81,8 +82,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
             factory.createPartitionRequestClient(
                     nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
         } finally {
-            nettyServerAndClient.client().shutdown();
-            nettyServerAndClient.server().shutdown();
+            shutdown(nettyServerAndClient);
         }
     }
 
@@ -125,8 +125,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
 
             factory.createPartitionRequestClient(connectionID);
         } finally {
-            nettyServerAndClient.client().shutdown();
-            nettyServerAndClient.server().shutdown();
+            shutdown(nettyServerAndClient);
         }
     }
 
@@ -139,8 +138,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
             checkReuseNettyPartitionRequestClient(nettyServerAndClient, 5);
             checkReuseNettyPartitionRequestClient(nettyServerAndClient, 10);
         } finally {
-            nettyServerAndClient.client().shutdown();
-            nettyServerAndClient.server().shutdown();
+            shutdown(nettyServerAndClient);
         }
     }
 
@@ -176,8 +174,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
 
         factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));
 
-        serverAndClient.client().shutdown();
-        serverAndClient.server().shutdown();
+        shutdown(serverAndClient);
     }
 
     // see https://issues.apache.org/jira/browse/FLINK-18821
@@ -218,14 +215,12 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                             unstableNettyClient, 2, 1, connectionReuseEnabled);
 
             assertThatThrownBy(
-                            () -> {
-                                factory.createPartitionRequestClient(
-                                        serverAndClient.getConnectionID(RESOURCE_ID, 0));
-                            })
+                            () ->
+                                    factory.createPartitionRequestClient(
+                                            serverAndClient.getConnectionID(RESOURCE_ID, 0)))
                     .isInstanceOf(IOException.class);
         } finally {
-            serverAndClient.client().shutdown();
-            serverAndClient.server().shutdown();
+            shutdown(serverAndClient);
         }
     }
 
@@ -274,8 +269,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                 });
 
         threadPoolExecutor.shutdown();
-        serverAndClient.client().shutdown();
-        serverAndClient.server().shutdown();
+        shutdown(serverAndClient);
     }
 
     private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {


[flink] 03/03: [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e7854193816dc348086423b42d4dff12dca4a80e
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 15:57:58 2022 +0800

    [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel
---
 .../CreditBasedPartitionRequestClientHandler.java  | 28 ++++----
 .../netty/PartitionRequestClientFactoryTest.java   | 74 ++++++++++++++++++++++
 2 files changed, 86 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 5e8ab08c423..d03abecd7f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -124,22 +124,18 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        // Unexpected close. In normal operation, the client closes the connection after all input
-        // channels have been removed. This indicates a problem with the remote task manager.
-        if (!inputChannels.isEmpty()) {
-            final SocketAddress remoteAddr = ctx.channel().remoteAddress();
-
-            notifyAllChannelsOfErrorAndClose(
-                    new RemoteTransportException(
-                            "Connection unexpectedly closed by remote task manager '"
-                                    + remoteAddr
-                                    + " [ "
-                                    + connectionID.getResourceID().getStringWithMetadata()
-                                    + " ] "
-                                    + "'. "
-                                    + "This might indicate that the remote task manager was lost.",
-                            remoteAddr));
-        }
+        final SocketAddress remoteAddr = ctx.channel().remoteAddress();
+
+        notifyAllChannelsOfErrorAndClose(
+                new RemoteTransportException(
+                        "Connection unexpectedly closed by remote task manager '"
+                                + remoteAddr
+                                + " [ "
+                                + connectionID.getResourceID().getStringWithMetadata()
+                                + " ] "
+                                + "'. "
+                                + "This might indicate that the remote task manager was lost.",
+                        remoteAddr));
 
         super.channelInactive(ctx);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 56110d95cbc..d923d76a6c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -27,9 +27,12 @@ import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTe
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
 
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -48,6 +51,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -162,6 +166,56 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
+    /**
+     * Verify that the netty client reuse when the netty server closes the channel and there is no
+     * input channel.
+     */
+    @TestTemplate
+    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+        CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
+        CompletableFuture<Channel> serverChannelFuture = new CompletableFuture<>();
+        NettyProtocol protocol =
+                new NettyProtocol(null, null) {
+                    @Override
+                    public ChannelHandler[] getServerChannelHandlers() {
+                        return new ChannelHandler[] {
+                            // Close on read
+                            new ChannelInboundHandlerAdapter() {
+                                @Override
+                                public void channelRegistered(ChannelHandlerContext ctx)
+                                        throws Exception {
+                                    super.channelRegistered(ctx);
+                                    serverChannelFuture.complete(ctx.channel());
+                                }
+                            }
+                        };
+                    }
+
+                    @Override
+                    public ChannelHandler[] getClientChannelHandlers() {
+                        return new ChannelHandler[] {
+                            new ChannelInactiveFutureHandler(inactiveFuture)
+                        };
+                    }
+                };
+        NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol);
+
+        PartitionRequestClientFactory factory =
+                new PartitionRequestClientFactory(
+                        serverAndClient.client(), 2, 1, connectionReuseEnabled);
+
+        ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
+        NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
+
+        // close server channel
+        Channel channel = serverChannelFuture.get();
+        channel.close();
+        inactiveFuture.get();
+        NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
+        assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient);
+        shutdown(serverAndClient);
+    }
+
     @TestTemplate
     void testNettyClientConnectRetry() throws Exception {
         NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
@@ -344,4 +398,24 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
             }
         }
     }
+
+    private static class ChannelInactiveFutureHandler
+            extends CreditBasedPartitionRequestClientHandler {
+
+        private final CompletableFuture<Void> inactiveFuture;
+
+        private ChannelInactiveFutureHandler(CompletableFuture<Void> inactiveFuture) {
+            this.inactiveFuture = inactiveFuture;
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            super.channelInactive(ctx);
+            inactiveFuture.complete(null);
+        }
+
+        public CompletableFuture<Void> getInactiveFuture() {
+            return inactiveFuture;
+        }
+    }
 }


[flink] 01/03: [FLINK-28695][hotfix][network] Remove some unused exceptions

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b10f422f20ca22661a1870eadc343cab4a44567
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 15:30:58 2022 +0800

    [FLINK-28695][hotfix][network] Remove some unused exceptions
---
 .../io/network/netty/CreditBasedPartitionRequestClientHandler.java    | 2 +-
 .../flink/runtime/io/network/netty/NettyPartitionRequestClient.java   | 4 ++--
 .../runtime/io/network/netty/PartitionRequestClientFactoryTest.java   | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 6480b49fef7..5e8ab08c423 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -275,7 +275,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
         }
     }
 
-    private void decodeMsg(Object msg) throws Throwable {
+    private void decodeMsg(Object msg) {
         final Class<?> msgClazz = msg.getClass();
 
         // ---- Buffer --------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
index de2fc747ecb..2bfa28b6040 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
@@ -133,7 +133,7 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
         final ChannelFutureListener listener =
                 new ChannelFutureListener() {
                     @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
+                    public void operationComplete(ChannelFuture future) {
                         if (!future.isSuccess()) {
                             clientHandler.removeInputChannel(inputChannel);
                             inputChannel.onError(
@@ -196,7 +196,7 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
                 .addListener(
                         new ChannelFutureListener() {
                             @Override
-                            public void operationComplete(ChannelFuture future) throws Exception {
+                            public void operationComplete(ChannelFuture future) {
                                 if (!future.isSuccess()) {
                                     inputChannel.onError(
                                             new LocalTransportException(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index c676046913d..f8ba8118a77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -182,7 +182,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
 
     // see https://issues.apache.org/jira/browse/FLINK-18821
     @TestTemplate
-    void testFailureReportedToSubsequentRequests() throws Exception {
+    void testFailureReportedToSubsequentRequests() {
         PartitionRequestClientFactory factory =
                 new PartitionRequestClientFactory(
                         new FailingNettyClient(), 2, 1, connectionReuseEnabled);