You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/13 08:21:05 UTC
[incubator-celeborn] branch branch-0.1 updated: [CELEBORN-212] Refresh client if current client is inactive
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.1
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.1 by this push:
new b190d3bd [CELEBORN-212] Refresh client if current client is inactive
b190d3bd is described below
commit b190d3bd6663ea014614e2d1e17708b59a888833
Author: zky.zhoukeyong <zk...@alibaba-inc.com>
AuthorDate: Fri Jan 13 16:20:34 2023 +0800
[CELEBORN-212] Refresh client if current client is inactive
---
.../aliyun/emr/rss/client/read/RssInputStream.java | 28 +++++++++++++++-------
.../service/deploy/master/MasterUtilSuiteJ.java | 2 +-
2 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
index 1729eb2a..4f38f7c5 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
@@ -33,10 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
-import com.aliyun.emr.rss.common.network.client.TransportClient;
-import com.aliyun.emr.rss.common.network.protocol.Message;
-import com.aliyun.emr.rss.common.network.protocol.OpenStream;
-import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
import io.netty.buffer.ByteBuf;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@@ -47,9 +43,14 @@ import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer;
import com.aliyun.emr.rss.common.network.buffer.NettyManagedBuffer;
import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback;
+import com.aliyun.emr.rss.common.network.client.TransportClient;
import com.aliyun.emr.rss.common.network.client.TransportClientFactory;
+import com.aliyun.emr.rss.common.network.protocol.Message;
+import com.aliyun.emr.rss.common.network.protocol.OpenStream;
+import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
import com.aliyun.emr.rss.common.protocol.PartitionLocation;
import com.aliyun.emr.rss.common.unsafe.Platform;
+import com.aliyun.emr.rss.common.util.ExceptionUtils;
public abstract class RssInputStream extends InputStream {
private static final Logger logger = LoggerFactory.getLogger(RssInputStream.class);
@@ -437,7 +438,6 @@ public abstract class RssInputStream extends InputStream {
private final class PartitionReader {
private PartitionLocation location;
- private TransportClient client;
private StreamHandle streamHandle;
private int returnedChunks;
@@ -482,6 +482,7 @@ public abstract class RssInputStream extends InputStream {
exception.set(new IOException(errorMsg, e));
}
};
+ TransportClient client;
try {
client = clientFactory.createClient(location.getHost(), location.getFetchPort());
} catch (InterruptedException ie) {
@@ -533,7 +534,7 @@ public abstract class RssInputStream extends InputStream {
results.clear();
}
- private void fetchChunks() {
+ private void fetchChunks() throws IOException {
final int inFlight = chunkIndex - returnedChunks;
if (inFlight < maxInFlight) {
final int toFetch = Math.min(maxInFlight - inFlight + 1,
@@ -542,8 +543,19 @@ public abstract class RssInputStream extends InputStream {
if (testFetch && fetchChunkRetryCnt < fetchChunkMaxRetry - 1 && chunkIndex == 3) {
callback.onFailure(chunkIndex, new IOException("Test fetch chunk failure"));
} else {
- client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
- chunkIndex++;
+ try {
+ TransportClient client = clientFactory.createClient(location.getHost(),
+ location.getFetchPort());
+ client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
+ chunkIndex++;
+ } catch (IOException | InterruptedException e) {
+ logger.error(
+ "fetchChunk for streamId: {}, chunkIndex: {} failed.",
+ streamHandle.streamId,
+ chunkIndex,
+ e);
+ ExceptionUtils.wrapAndThrowIOException(e);
+ }
}
}
}
diff --git a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
index 61dae921..a31870c3 100644
--- a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
+++ b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
@@ -23,11 +23,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import com.aliyun.emr.rss.common.RssConf;
import scala.Tuple2;
import org.junit.Test;
+import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.meta.WorkerInfo;
import com.aliyun.emr.rss.common.protocol.PartitionLocation;