You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/28 12:17:12 UTC
[incubator-celeborn] branch main updated: [CELEBORN-68][FOLLOWUP] Retry on same partition location should have a retry wait interval (#1017)
This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 13f4ce2b [CELEBORN-68][FOLLOWUP] Retry on same partition location should have a retry wait interval (#1017)
13f4ce2b is described below
commit 13f4ce2be69ada5deacd0c3ff0b307fb4383db66
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Nov 28 20:17:08 2022 +0800
[CELEBORN-68][FOLLOWUP] Retry on same partition location should have a retry wait interval (#1017)
---
.../java/org/apache/celeborn/client/read/RssInputStream.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index 17081751..70eb7272 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -20,8 +20,10 @@ package org.apache.celeborn.client.read;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
+import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.buffer.ByteBuf;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@@ -30,8 +32,10 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.compress.Decompressor;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.client.TransportClientFactory;
+import org.apache.celeborn.common.network.util.TransportConf;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.Utils;
@@ -107,6 +111,7 @@ public abstract class RssInputStream extends InputStream {
private PartitionReader currentReader;
private final int fetchChunkMaxRetry;
private int fetchChunkRetryCnt = 0;
+ int retryWaitMs;
private int fileIndex;
private int position;
private int limit;
@@ -147,7 +152,9 @@ public abstract class RssInputStream extends InputStream {
decompressor = Decompressor.getDecompressor(conf);
fetchChunkMaxRetry = conf.fetchMaxRetries();
-
+ TransportConf transportConf =
+ Utils.fromCelebornConf(conf, TransportModuleConstants.DATA_MODULE, 0);
+ retryWaitMs = transportConf.ioRetryWaitTimeMs();
moveToNextReader();
}
@@ -231,6 +238,7 @@ public abstract class RssInputStream extends InputStream {
"CreatePartitionReader failed {}/{} times, retry the same location",
fetchChunkRetryCnt,
fetchChunkMaxRetry);
+ Uninterruptibles.sleepUninterruptibly(retryWaitMs, TimeUnit.MILLISECONDS);
}
}
}
@@ -256,6 +264,7 @@ public abstract class RssInputStream extends InputStream {
currentReader = createReaderWithRetry(currentReader.getLocation().getPeer());
} else {
logger.warn("Fetch chunk failed {}/{} times", fetchChunkRetryCnt, fetchChunkMaxRetry);
+ Uninterruptibles.sleepUninterruptibly(retryWaitMs, TimeUnit.MILLISECONDS);
currentReader = createReaderWithRetry(currentReader.getLocation());
}
}