You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/11 03:54:54 UTC
[incubator-celeborn] branch main updated: [CELEBORN-212] refresh client if current client is inactive. (#1159)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 810a8d01 [CELEBORN-212] refresh client if current client is inactive. (#1159)
810a8d01 is described below
commit 810a8d01e083d8df663bcd9482f0d1178fbe1ccc
Author: Shuang <lv...@gmail.com>
AuthorDate: Wed Jan 11 11:54:50 2023 +0800
[CELEBORN-212] refresh client if current client is inactive. (#1159)
---
.../client/read/WorkerPartitionReader.java | 23 ++++++++++++----
.../celeborn/common/util/ExceptionUtils.java | 31 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 5 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
index 9d154f58..2a9bae7b 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
@@ -38,11 +38,12 @@ import org.apache.celeborn.common.network.protocol.Message;
import org.apache.celeborn.common.network.protocol.OpenStream;
import org.apache.celeborn.common.network.protocol.StreamHandle;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.ExceptionUtils;
public class WorkerPartitionReader implements PartitionReader {
private final Logger logger = LoggerFactory.getLogger(WorkerPartitionReader.class);
private PartitionLocation location;
- private final TransportClient client;
+ private final TransportClientFactory clientFactory;
private StreamHandle streamHandle;
private int returnedChunks;
@@ -94,6 +95,7 @@ public class WorkerPartitionReader implements PartitionReader {
exception.set(new IOException(errorMsg, e));
}
};
+ TransportClient client;
try {
client = clientFactory.createClient(location.getHost(), location.getFetchPort());
} catch (InterruptedException ie) {
@@ -106,7 +108,7 @@ public class WorkerPartitionReader implements PartitionReader {
streamHandle = (StreamHandle) Message.decode(response);
this.location = location;
-
+ this.clientFactory = clientFactory;
this.fetchChunkRetryCnt = fetchChunkRetryCnt;
this.fetchChunkMaxRetry = fetchChunkMaxRetry;
testFetch = conf.testFetchFailure();
@@ -152,7 +154,7 @@ public class WorkerPartitionReader implements PartitionReader {
return location;
}
- private void fetchChunks() {
+ private void fetchChunks() throws IOException {
final int inFlight = chunkIndex - returnedChunks;
if (inFlight < fetchMaxReqsInFlight) {
final int toFetch =
@@ -161,8 +163,19 @@ public class WorkerPartitionReader implements PartitionReader {
if (testFetch && fetchChunkRetryCnt < fetchChunkMaxRetry - 1 && chunkIndex == 3) {
callback.onFailure(chunkIndex, new IOException("Test fetch chunk failure"));
} else {
- client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
- chunkIndex++;
+ try {
+ TransportClient client =
+ clientFactory.createClient(location.getHost(), location.getFetchPort());
+ client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
+ chunkIndex++;
+ } catch (IOException | InterruptedException e) {
+ logger.error(
+ "fetchChunk for streamId: {}, chunkIndex: {} failed.",
+ streamHandle.streamId,
+ chunkIndex,
+ e);
+ ExceptionUtils.wrapAndThrowIOException(e);
+ }
}
}
}
diff --git a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
new file mode 100644
index 00000000..5336f6d1
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.util;
+
+import java.io.IOException;
+
+public class ExceptionUtils {
+
+ public static void wrapAndThrowIOException(Exception exception) throws IOException {
+ if (exception instanceof IOException) {
+ throw (IOException) exception;
+ } else {
+ throw new IOException(exception.getMessage(), exception);
+ }
+ }
+}