You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/21 10:45:09 UTC
[flink] 02/03: [FLINK-28695][refactor][network] Refactor the nettyServerAndClient shutdown and some lambdas
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d4a50a7467ea64840c4a24f5291613d2f7a3e1d5
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 15:37:44 2022 +0800
[FLINK-28695][refactor][network] Refactor the nettyServerAndClient shutdown and some lambdas
---
.../network/netty/NettyPartitionRequestClient.java | 98 ++++++++++------------
.../netty/PartitionRequestClientFactoryTest.java | 26 +++---
2 files changed, 55 insertions(+), 69 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
index 2bfa28b6040..cbd823c134c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
@@ -131,29 +131,26 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
inputChannel.getInitialCredit());
final ChannelFutureListener listener =
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- clientHandler.removeInputChannel(inputChannel);
- inputChannel.onError(
- new LocalTransportException(
- String.format(
- "Sending the partition request to '%s [%s] (#%d)' failed.",
- connectionId.getAddress(),
- connectionId
- .getResourceID()
- .getStringWithMetadata(),
- connectionId.getConnectionIndex()),
- future.channel().localAddress(),
- future.cause()));
- sendToChannel(
- new ConnectionErrorMessage(
- future.cause() == null
- ? new RuntimeException(
- "Cannot send partition request.")
- : future.cause()));
- }
+ future -> {
+ if (!future.isSuccess()) {
+ clientHandler.removeInputChannel(inputChannel);
+ inputChannel.onError(
+ new LocalTransportException(
+ String.format(
+ "Sending the partition request to '%s [%s] (#%d)' failed.",
+ connectionId.getAddress(),
+ connectionId
+ .getResourceID()
+ .getStringWithMetadata(),
+ connectionId.getConnectionIndex()),
+ future.channel().localAddress(),
+ future.cause()));
+ sendToChannel(
+ new ConnectionErrorMessage(
+ future.cause() == null
+ ? new RuntimeException(
+ "Cannot send partition request.")
+ : future.cause()));
}
};
@@ -165,12 +162,9 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
tcpChannel
.eventLoop()
.schedule(
- new Runnable() {
- @Override
- public void run() {
- f[0] = tcpChannel.writeAndFlush(request);
- f[0].addListener(listener);
- }
+ () -> {
+ f[0] = tcpChannel.writeAndFlush(request);
+ f[0].addListener(listener);
},
delayMs,
TimeUnit.MILLISECONDS);
@@ -194,30 +188,28 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
.writeAndFlush(
new TaskEventRequest(event, partitionId, inputChannel.getInputChannelId()))
.addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- inputChannel.onError(
- new LocalTransportException(
- String.format(
- "Sending the task event to '%s [%s] (#%d)' failed.",
- connectionId.getAddress(),
- connectionId
- .getResourceID()
- .getStringWithMetadata(),
- connectionId.getConnectionIndex()),
- future.channel().localAddress(),
- future.cause()));
- sendToChannel(
- new ConnectionErrorMessage(
- future.cause() == null
- ? new RuntimeException(
- "Cannot send task event.")
- : future.cause()));
- }
- }
- });
+ (ChannelFutureListener)
+ future -> {
+ if (!future.isSuccess()) {
+ inputChannel.onError(
+ new LocalTransportException(
+ String.format(
+ "Sending the task event to '%s [%s] (#%d)' failed.",
+ connectionId.getAddress(),
+ connectionId
+ .getResourceID()
+ .getStringWithMetadata(),
+ connectionId.getConnectionIndex()),
+ future.channel().localAddress(),
+ future.cause()));
+ sendToChannel(
+ new ConnectionErrorMessage(
+ future.cause() == null
+ ? new RuntimeException(
+ "Cannot send task event.")
+ : future.cause()));
+ }
+ });
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index f8ba8118a77..56110d95cbc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.fail;
@@ -81,8 +82,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
factory.createPartitionRequestClient(
nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
} finally {
- nettyServerAndClient.client().shutdown();
- nettyServerAndClient.server().shutdown();
+ shutdown(nettyServerAndClient);
}
}
@@ -125,8 +125,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
factory.createPartitionRequestClient(connectionID);
} finally {
- nettyServerAndClient.client().shutdown();
- nettyServerAndClient.server().shutdown();
+ shutdown(nettyServerAndClient);
}
}
@@ -139,8 +138,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
checkReuseNettyPartitionRequestClient(nettyServerAndClient, 5);
checkReuseNettyPartitionRequestClient(nettyServerAndClient, 10);
} finally {
- nettyServerAndClient.client().shutdown();
- nettyServerAndClient.server().shutdown();
+ shutdown(nettyServerAndClient);
}
}
@@ -176,8 +174,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));
- serverAndClient.client().shutdown();
- serverAndClient.server().shutdown();
+ shutdown(serverAndClient);
}
// see https://issues.apache.org/jira/browse/FLINK-18821
@@ -218,14 +215,12 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
unstableNettyClient, 2, 1, connectionReuseEnabled);
assertThatThrownBy(
- () -> {
- factory.createPartitionRequestClient(
- serverAndClient.getConnectionID(RESOURCE_ID, 0));
- })
+ () ->
+ factory.createPartitionRequestClient(
+ serverAndClient.getConnectionID(RESOURCE_ID, 0)))
.isInstanceOf(IOException.class);
} finally {
- serverAndClient.client().shutdown();
- serverAndClient.server().shutdown();
+ shutdown(serverAndClient);
}
}
@@ -274,8 +269,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
});
threadPoolExecutor.shutdown();
- serverAndClient.client().shutdown();
- serverAndClient.server().shutdown();
+ shutdown(serverAndClient);
}
private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {