You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/28 12:17:12 UTC

[incubator-celeborn] branch main updated: [CELEBORN-68][FOLLOWUP] Retry on same partition location should have a retry wait interval (#1017)

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 13f4ce2b [CELEBORN-68][FOLLOWUP] Retry on same partition location should have a retry wait interval (#1017)
13f4ce2b is described below

commit 13f4ce2be69ada5deacd0c3ff0b307fb4383db66
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Nov 28 20:17:08 2022 +0800

    [CELEBORN-68][FOLLOWUP] Retry on same partition location should have a retry wait interval (#1017)
---
 .../java/org/apache/celeborn/client/read/RssInputStream.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index 17081751..70eb7272 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -20,8 +20,10 @@ package org.apache.celeborn.client.read;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import io.netty.buffer.ByteBuf;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
@@ -30,8 +32,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.celeborn.client.compress.Decompressor;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.network.client.TransportClientFactory;
+import org.apache.celeborn.common.network.util.TransportConf;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.protocol.TransportModuleConstants;
 import org.apache.celeborn.common.unsafe.Platform;
 import org.apache.celeborn.common.util.Utils;
 
@@ -107,6 +111,7 @@ public abstract class RssInputStream extends InputStream {
     private PartitionReader currentReader;
     private final int fetchChunkMaxRetry;
     private int fetchChunkRetryCnt = 0;
+    int retryWaitMs;
     private int fileIndex;
     private int position;
     private int limit;
@@ -147,7 +152,9 @@ public abstract class RssInputStream extends InputStream {
       decompressor = Decompressor.getDecompressor(conf);
 
       fetchChunkMaxRetry = conf.fetchMaxRetries();
-
+      TransportConf transportConf =
+          Utils.fromCelebornConf(conf, TransportModuleConstants.DATA_MODULE, 0);
+      retryWaitMs = transportConf.ioRetryWaitTimeMs();
       moveToNextReader();
     }
 
@@ -231,6 +238,7 @@ public abstract class RssInputStream extends InputStream {
                 "CreatePartitionReader failed {}/{} times, retry the same location",
                 fetchChunkRetryCnt,
                 fetchChunkMaxRetry);
+            Uninterruptibles.sleepUninterruptibly(retryWaitMs, TimeUnit.MILLISECONDS);
           }
         }
       }
@@ -256,6 +264,7 @@ public abstract class RssInputStream extends InputStream {
               currentReader = createReaderWithRetry(currentReader.getLocation().getPeer());
             } else {
               logger.warn("Fetch chunk failed {}/{} times", fetchChunkRetryCnt, fetchChunkMaxRetry);
+              Uninterruptibles.sleepUninterruptibly(retryWaitMs, TimeUnit.MILLISECONDS);
               currentReader = createReaderWithRetry(currentReader.getLocation());
             }
           }