You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2023/03/10 06:43:30 UTC
[incubator-celeborn] branch main updated: [CELEBORN-389][FLINK] Fix remove transportClient from readClientHandler caused NPE (#1323)
This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2d4a4f25b [CELEBORN-389][FLINK] Fix remove transportClient from readClientHandler caused NPE (#1323)
2d4a4f25b is described below
commit 2d4a4f25bdf497e19a79ed98e95a781b36af5a49
Author: Ethan Feng <et...@apache.org>
AuthorDate: Fri Mar 10 14:43:25 2023 +0800
[CELEBORN-389][FLINK] Fix remove transportClient from readClientHandler caused NPE (#1323)
---
.../org/apache/celeborn/plugin/flink/network/ReadClientHandler.java | 2 +-
.../apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java | 1 -
2 files changed, 1 insertion(+), 2 deletions(-)
diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
index 86afc80c8..4a7d99d3c 100644
--- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
+++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
@@ -47,7 +47,7 @@ public class ReadClientHandler extends BaseMessageHandler {
streamHandlers.remove(streamId);
TransportClient client = streamClients.remove(streamId);
// If read handler is removed, we should notify worker to release resource.
- if (client.isActive()) {
+ if (client != null && client.isActive()) {
client.getChannel().writeAndFlush(new BufferStreamEnd(streamId));
}
}
diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 336c98a58..6bd86306f 100644
--- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -104,7 +104,6 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
new TransportContext(
dataTransportConf, readClientHandler, conf.clientCloseIdleConnections());
this.flinkTransportClientFactory = new FlinkTransportClientFactory(context);
- this.setupMetaServiceRef(driverHost, port);
}
public RssBufferStream readBufferedPartition(