You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/11 03:54:54 UTC

[incubator-celeborn] branch main updated: [CELEBORN-212] refresh client if current client is inactive. (#1159)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 810a8d01 [CELEBORN-212] refresh client if current client is inactive. (#1159)
810a8d01 is described below

commit 810a8d01e083d8df663bcd9482f0d1178fbe1ccc
Author: Shuang <lv...@gmail.com>
AuthorDate: Wed Jan 11 11:54:50 2023 +0800

    [CELEBORN-212] refresh client if current client is inactive. (#1159)
---
 .../client/read/WorkerPartitionReader.java         | 23 ++++++++++++----
 .../celeborn/common/util/ExceptionUtils.java       | 31 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 5 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
index 9d154f58..2a9bae7b 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
@@ -38,11 +38,12 @@ import org.apache.celeborn.common.network.protocol.Message;
 import org.apache.celeborn.common.network.protocol.OpenStream;
 import org.apache.celeborn.common.network.protocol.StreamHandle;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.ExceptionUtils;
 
 public class WorkerPartitionReader implements PartitionReader {
   private final Logger logger = LoggerFactory.getLogger(WorkerPartitionReader.class);
   private PartitionLocation location;
-  private final TransportClient client;
+  private final TransportClientFactory clientFactory;
   private StreamHandle streamHandle;
 
   private int returnedChunks;
@@ -94,6 +95,7 @@ public class WorkerPartitionReader implements PartitionReader {
             exception.set(new IOException(errorMsg, e));
           }
         };
+    TransportClient client;
     try {
       client = clientFactory.createClient(location.getHost(), location.getFetchPort());
     } catch (InterruptedException ie) {
@@ -106,7 +108,7 @@ public class WorkerPartitionReader implements PartitionReader {
     streamHandle = (StreamHandle) Message.decode(response);
 
     this.location = location;
-
+    this.clientFactory = clientFactory;
     this.fetchChunkRetryCnt = fetchChunkRetryCnt;
     this.fetchChunkMaxRetry = fetchChunkMaxRetry;
     testFetch = conf.testFetchFailure();
@@ -152,7 +154,7 @@ public class WorkerPartitionReader implements PartitionReader {
     return location;
   }
 
-  private void fetchChunks() {
+  private void fetchChunks() throws IOException {
     final int inFlight = chunkIndex - returnedChunks;
     if (inFlight < fetchMaxReqsInFlight) {
       final int toFetch =
@@ -161,8 +163,19 @@ public class WorkerPartitionReader implements PartitionReader {
         if (testFetch && fetchChunkRetryCnt < fetchChunkMaxRetry - 1 && chunkIndex == 3) {
           callback.onFailure(chunkIndex, new IOException("Test fetch chunk failure"));
         } else {
-          client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
-          chunkIndex++;
+          try {
+            TransportClient client =
+                clientFactory.createClient(location.getHost(), location.getFetchPort());
+            client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
+            chunkIndex++;
+          } catch (IOException | InterruptedException e) {
+            logger.error(
+                "fetchChunk for streamId: {}, chunkIndex: {} failed.",
+                streamHandle.streamId,
+                chunkIndex,
+                e);
+            ExceptionUtils.wrapAndThrowIOException(e);
+          }
         }
       }
     }
diff --git a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
new file mode 100644
index 00000000..5336f6d1
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.util;
+
+import java.io.IOException;
+
+public class ExceptionUtils {
+
+  public static void wrapAndThrowIOException(Exception exception) throws IOException {
+    if (exception instanceof IOException) {
+      throw (IOException) exception;
+    } else {
+      throw new IOException(exception.getMessage(), exception);
+    }
+  }
+}