You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/23 10:08:02 UTC

[incubator-celeborn] branch main updated: [CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. (#1000)

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f1c4d675 [CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. (#1000)
f1c4d675 is described below

commit f1c4d675d62eed9b9776167f76ffddfec08fe8d7
Author: Ethan Feng <et...@apache.org>
AuthorDate: Wed Nov 23 18:07:57 2022 +0800

    [CELEBORN-50] Channel inActive may cause new client use old stream id to fetch data cause IllegalStateException. (#1000)
---
 .../org/apache/celeborn/client/read/Replica.java   | 27 ++++++++++++++++++----
 1 file changed, 23 insertions(+), 4 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/read/Replica.java b/client/src/main/java/org/apache/celeborn/client/read/Replica.java
index 40e60b88..5b8f1ed1 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/Replica.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/Replica.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.network.client.TransportClient;
 import org.apache.celeborn.common.network.client.TransportClientFactory;
@@ -30,6 +32,7 @@ import org.apache.celeborn.common.network.protocol.StreamHandle;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 
 class Replica {
+  private Logger logger = LoggerFactory.getLogger(Replica.class);
   private final long timeoutMs;
   private final String shuffleKey;
   private final PartitionLocation location;
@@ -57,17 +60,33 @@ class Replica {
 
   public synchronized TransportClient getOrOpenStream() throws IOException, InterruptedException {
     if (client == null || !client.isActive()) {
+      if (client != null) {
+        logger.warn(
+            "Current channel from "
+                + client.getChannel().localAddress()
+                + " to "
+                + client.getChannel().remoteAddress()
+                + " for "
+                + this
+                + " is not active.");
+      }
       client = clientFactory.createClient(location.getHost(), location.getFetchPort());
+      openStreamInternal();
     }
+    // For retried open stream if openStream rpc is failed.
     if (streamHandle == null) {
-      OpenStream openBlocks =
-          new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex);
-      ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
-      streamHandle = (StreamHandle) Message.decode(response);
+      openStreamInternal();
     }
     return client;
   }
 
+  private void openStreamInternal() {
+    OpenStream openBlocks =
+        new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex);
+    ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
+    streamHandle = (StreamHandle) Message.decode(response);
+  }
+
   public long getStreamId() {
     return streamHandle.streamId;
   }