You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2023/03/10 06:43:30 UTC

[incubator-celeborn] branch main updated: [CELEBORN-389][FLINK] Fix remove transportClient from readClientHandler caused NPE (#1323)

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d4a4f25b [CELEBORN-389][FLINK] Fix remove transportClient from readClientHandler caused NPE (#1323)
2d4a4f25b is described below

commit 2d4a4f25bdf497e19a79ed98e95a781b36af5a49
Author: Ethan Feng <et...@apache.org>
AuthorDate: Fri Mar 10 14:43:25 2023 +0800

    [CELEBORN-389][FLINK] Fix remove transportClient from readClientHandler caused NPE (#1323)
---
 .../org/apache/celeborn/plugin/flink/network/ReadClientHandler.java     | 2 +-
 .../apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)

diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
index 86afc80c8..4a7d99d3c 100644
--- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
+++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
@@ -47,7 +47,7 @@ public class ReadClientHandler extends BaseMessageHandler {
     streamHandlers.remove(streamId);
     TransportClient client = streamClients.remove(streamId);
     // If read handler is removed, we should notify worker to release resource.
-    if (client.isActive()) {
+    if (client != null && client.isActive()) {
       client.getChannel().writeAndFlush(new BufferStreamEnd(streamId));
     }
   }
diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 336c98a58..6bd86306f 100644
--- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -104,7 +104,6 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
         new TransportContext(
             dataTransportConf, readClientHandler, conf.clientCloseIdleConnections());
     this.flinkTransportClientFactory = new FlinkTransportClientFactory(context);
-    this.setupMetaServiceRef(driverHost, port);
   }
 
   public RssBufferStream readBufferedPartition(