You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/13 08:21:05 UTC

[incubator-celeborn] branch branch-0.1 updated: [CELEBORN-212] Refresh client if current client is inactive

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.1
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.1 by this push:
     new b190d3bd [CELEBORN-212] Refresh client if current client is inactive
b190d3bd is described below

commit b190d3bd6663ea014614e2d1e17708b59a888833
Author: zky.zhoukeyong <zk...@alibaba-inc.com>
AuthorDate: Fri Jan 13 16:20:34 2023 +0800

    [CELEBORN-212] Refresh client if current client is inactive
---
 .../aliyun/emr/rss/client/read/RssInputStream.java | 28 +++++++++++++++-------
 .../service/deploy/master/MasterUtilSuiteJ.java    |  2 +-
 2 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
index 1729eb2a..4f38f7c5 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
@@ -33,10 +33,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 
-import com.aliyun.emr.rss.common.network.client.TransportClient;
-import com.aliyun.emr.rss.common.network.protocol.Message;
-import com.aliyun.emr.rss.common.network.protocol.OpenStream;
-import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
 import io.netty.buffer.ByteBuf;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
@@ -47,9 +43,14 @@ import com.aliyun.emr.rss.common.RssConf;
 import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer;
 import com.aliyun.emr.rss.common.network.buffer.NettyManagedBuffer;
 import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback;
+import com.aliyun.emr.rss.common.network.client.TransportClient;
 import com.aliyun.emr.rss.common.network.client.TransportClientFactory;
+import com.aliyun.emr.rss.common.network.protocol.Message;
+import com.aliyun.emr.rss.common.network.protocol.OpenStream;
+import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
 import com.aliyun.emr.rss.common.protocol.PartitionLocation;
 import com.aliyun.emr.rss.common.unsafe.Platform;
+import com.aliyun.emr.rss.common.util.ExceptionUtils;
 
 public abstract class RssInputStream extends InputStream {
   private static final Logger logger = LoggerFactory.getLogger(RssInputStream.class);
@@ -437,7 +438,6 @@ public abstract class RssInputStream extends InputStream {
 
     private final class PartitionReader {
       private PartitionLocation location;
-      private TransportClient client;
       private StreamHandle streamHandle;
 
       private int returnedChunks;
@@ -482,6 +482,7 @@ public abstract class RssInputStream extends InputStream {
             exception.set(new IOException(errorMsg, e));
           }
         };
+        TransportClient client;
         try {
           client = clientFactory.createClient(location.getHost(), location.getFetchPort());
         } catch (InterruptedException ie) {
@@ -533,7 +534,7 @@ public abstract class RssInputStream extends InputStream {
         results.clear();
       }
 
-      private void fetchChunks() {
+      private void fetchChunks() throws IOException {
         final int inFlight = chunkIndex - returnedChunks;
         if (inFlight < maxInFlight) {
           final int toFetch = Math.min(maxInFlight - inFlight + 1,
@@ -542,8 +543,19 @@ public abstract class RssInputStream extends InputStream {
             if (testFetch && fetchChunkRetryCnt < fetchChunkMaxRetry - 1 && chunkIndex == 3) {
               callback.onFailure(chunkIndex, new IOException("Test fetch chunk failure"));
             } else {
-              client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
-              chunkIndex++;
+              try {
+                TransportClient client = clientFactory.createClient(location.getHost(),
+                  location.getFetchPort());
+                client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
+                chunkIndex++;
+              } catch (IOException | InterruptedException e) {
+                logger.error(
+                  "fetchChunk for streamId: {}, chunkIndex: {} failed.",
+                  streamHandle.streamId,
+                  chunkIndex,
+                  e);
+                ExceptionUtils.wrapAndThrowIOException(e);
+              }
             }
           }
         }
diff --git a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
index 61dae921..a31870c3 100644
--- a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
+++ b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
@@ -23,11 +23,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import com.aliyun.emr.rss.common.RssConf;
 import scala.Tuple2;
 
 import org.junit.Test;
 
+import com.aliyun.emr.rss.common.RssConf;
 import com.aliyun.emr.rss.common.meta.WorkerInfo;
 import com.aliyun.emr.rss.common.protocol.PartitionLocation;