You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/21 10:45:09 UTC

[flink] 02/03: [FLINK-28695][refactor][network] Refactor the nettyServerAndClient shutdown and some lambdas

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4a50a7467ea64840c4a24f5291613d2f7a3e1d5
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 15:37:44 2022 +0800

    [FLINK-28695][refactor][network] Refactor the nettyServerAndClient shutdown and some lambdas
---
 .../network/netty/NettyPartitionRequestClient.java | 98 ++++++++++------------
 .../netty/PartitionRequestClientFactoryTest.java   | 26 +++---
 2 files changed, 55 insertions(+), 69 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
index 2bfa28b6040..cbd823c134c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
@@ -131,29 +131,26 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
                         inputChannel.getInitialCredit());
 
         final ChannelFutureListener listener =
-                new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) {
-                        if (!future.isSuccess()) {
-                            clientHandler.removeInputChannel(inputChannel);
-                            inputChannel.onError(
-                                    new LocalTransportException(
-                                            String.format(
-                                                    "Sending the partition request to '%s [%s] (#%d)' failed.",
-                                                    connectionId.getAddress(),
-                                                    connectionId
-                                                            .getResourceID()
-                                                            .getStringWithMetadata(),
-                                                    connectionId.getConnectionIndex()),
-                                            future.channel().localAddress(),
-                                            future.cause()));
-                            sendToChannel(
-                                    new ConnectionErrorMessage(
-                                            future.cause() == null
-                                                    ? new RuntimeException(
-                                                            "Cannot send partition request.")
-                                                    : future.cause()));
-                        }
+                future -> {
+                    if (!future.isSuccess()) {
+                        clientHandler.removeInputChannel(inputChannel);
+                        inputChannel.onError(
+                                new LocalTransportException(
+                                        String.format(
+                                                "Sending the partition request to '%s [%s] (#%d)' failed.",
+                                                connectionId.getAddress(),
+                                                connectionId
+                                                        .getResourceID()
+                                                        .getStringWithMetadata(),
+                                                connectionId.getConnectionIndex()),
+                                        future.channel().localAddress(),
+                                        future.cause()));
+                        sendToChannel(
+                                new ConnectionErrorMessage(
+                                        future.cause() == null
+                                                ? new RuntimeException(
+                                                        "Cannot send partition request.")
+                                                : future.cause()));
                     }
                 };
 
@@ -165,12 +162,9 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
             tcpChannel
                     .eventLoop()
                     .schedule(
-                            new Runnable() {
-                                @Override
-                                public void run() {
-                                    f[0] = tcpChannel.writeAndFlush(request);
-                                    f[0].addListener(listener);
-                                }
+                            () -> {
+                                f[0] = tcpChannel.writeAndFlush(request);
+                                f[0].addListener(listener);
                             },
                             delayMs,
                             TimeUnit.MILLISECONDS);
@@ -194,30 +188,28 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
                 .writeAndFlush(
                         new TaskEventRequest(event, partitionId, inputChannel.getInputChannelId()))
                 .addListener(
-                        new ChannelFutureListener() {
-                            @Override
-                            public void operationComplete(ChannelFuture future) {
-                                if (!future.isSuccess()) {
-                                    inputChannel.onError(
-                                            new LocalTransportException(
-                                                    String.format(
-                                                            "Sending the task event to '%s [%s] (#%d)' failed.",
-                                                            connectionId.getAddress(),
-                                                            connectionId
-                                                                    .getResourceID()
-                                                                    .getStringWithMetadata(),
-                                                            connectionId.getConnectionIndex()),
-                                                    future.channel().localAddress(),
-                                                    future.cause()));
-                                    sendToChannel(
-                                            new ConnectionErrorMessage(
-                                                    future.cause() == null
-                                                            ? new RuntimeException(
-                                                                    "Cannot send task event.")
-                                                            : future.cause()));
-                                }
-                            }
-                        });
+                        (ChannelFutureListener)
+                                future -> {
+                                    if (!future.isSuccess()) {
+                                        inputChannel.onError(
+                                                new LocalTransportException(
+                                                        String.format(
+                                                                "Sending the task event to '%s [%s] (#%d)' failed.",
+                                                                connectionId.getAddress(),
+                                                                connectionId
+                                                                        .getResourceID()
+                                                                        .getStringWithMetadata(),
+                                                                connectionId.getConnectionIndex()),
+                                                        future.channel().localAddress(),
+                                                        future.cause()));
+                                        sendToChannel(
+                                                new ConnectionErrorMessage(
+                                                        future.cause() == null
+                                                                ? new RuntimeException(
+                                                                        "Cannot send task event.")
+                                                                : future.cause()));
+                                    }
+                                });
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index f8ba8118a77..56110d95cbc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -81,8 +82,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
             factory.createPartitionRequestClient(
                     nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
         } finally {
-            nettyServerAndClient.client().shutdown();
-            nettyServerAndClient.server().shutdown();
+            shutdown(nettyServerAndClient);
         }
     }
 
@@ -125,8 +125,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
 
             factory.createPartitionRequestClient(connectionID);
         } finally {
-            nettyServerAndClient.client().shutdown();
-            nettyServerAndClient.server().shutdown();
+            shutdown(nettyServerAndClient);
         }
     }
 
@@ -139,8 +138,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
             checkReuseNettyPartitionRequestClient(nettyServerAndClient, 5);
             checkReuseNettyPartitionRequestClient(nettyServerAndClient, 10);
         } finally {
-            nettyServerAndClient.client().shutdown();
-            nettyServerAndClient.server().shutdown();
+            shutdown(nettyServerAndClient);
         }
     }
 
@@ -176,8 +174,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
 
         factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));
 
-        serverAndClient.client().shutdown();
-        serverAndClient.server().shutdown();
+        shutdown(serverAndClient);
     }
 
     // see https://issues.apache.org/jira/browse/FLINK-18821
@@ -218,14 +215,12 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                             unstableNettyClient, 2, 1, connectionReuseEnabled);
 
             assertThatThrownBy(
-                            () -> {
-                                factory.createPartitionRequestClient(
-                                        serverAndClient.getConnectionID(RESOURCE_ID, 0));
-                            })
+                            () ->
+                                    factory.createPartitionRequestClient(
+                                            serverAndClient.getConnectionID(RESOURCE_ID, 0)))
                     .isInstanceOf(IOException.class);
         } finally {
-            serverAndClient.client().shutdown();
-            serverAndClient.server().shutdown();
+            shutdown(serverAndClient);
         }
     }
 
@@ -274,8 +269,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                 });
 
         threadPoolExecutor.shutdown();
-        serverAndClient.client().shutdown();
-        serverAndClient.server().shutdown();
+        shutdown(serverAndClient);
     }
 
     private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {