You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by an...@apache.org on 2023/01/30 09:47:27 UTC

[incubator-celeborn] branch main updated: [CELEBORN-243][CELEBORN-245][IMPROVEMENT] Create push client failed and connection failed cause push failed should have their own ERROR type (#1181)

This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1311fb53 [CELEBORN-243][CELEBORN-245][IMPROVEMENT] Create push client failed and connection failed cause push failed should have their own ERROR type (#1181)
1311fb53 is described below

commit 1311fb53d1c959949007ee3751bf866457a1b74f
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Jan 30 17:47:22 2023 +0800

    [CELEBORN-243][CELEBORN-245][IMPROVEMENT] Create push client failed and connection failed cause push failed should have their own ERROR type (#1181)
    
    * [CELEBORN-243][IMPROVEMENT] Create push client failed should have a ERROR type
---
 .../apache/celeborn/client/ShuffleClientImpl.java  | 74 ++++++++++++++++++----
 .../apache/celeborn/client/LifecycleManager.scala  | 22 ++++++-
 .../common/protocol/message/StatusCode.java        | 14 +++-
 .../org/apache/celeborn/common/util/Utils.scala    |  8 +++
 .../service/deploy/worker/PushDataHandler.scala    | 66 +++++++++++++------
 5 files changed, 150 insertions(+), 34 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 1de8840e..38cf7b68 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -739,12 +739,22 @@ public class ShuffleClientImpl extends ShuffleClient {
 
             @Override
             public void onFailure(Throwable e) {
+              StatusCode cause = getPushDataFailCause(e.getMessage());
+
               if (pushState.exception.get() != null) {
                 return;
               }
 
               if (remainReviveTimes <= 0) {
-                callback.onFailure(e);
+                callback.onFailure(
+                    new Exception(
+                        cause.getMessage()
+                            + "! Push data to master worker of "
+                            + loc
+                            + " failed: "
+                            + e.getMessage(),
+                        e));
+
                 return;
               }
 
@@ -771,7 +781,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                             loc,
                             this,
                             pushState,
-                            getPushDataFailCause(e.getMessage()),
+                            cause,
                             remainReviveTimes));
               } else {
                 pushState.removeBatch(nextBatchId, loc.hostAndPushPort());
@@ -793,12 +803,20 @@ public class ShuffleClientImpl extends ShuffleClient {
           ChannelFuture future = client.pushData(pushData, wrappedCallback);
           pushState.pushStarted(nextBatchId, future, wrappedCallback, loc.hostAndPushPort());
         } else {
-          throw new RuntimeException("Mock push data first time failed.");
+          wrappedCallback.onFailure(
+              new Exception(
+                  StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE.toString(),
+                  new RuntimeException("Mock push data first time failed.")));
         }
       } catch (Exception e) {
         logger.warn("PushData failed", e);
         wrappedCallback.onFailure(
-            new Exception(getPushDataFailCause(e.getMessage()).toString(), e));
+            new Exception(
+                StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage()
+                    + "! "
+                    + e.getMessage()
+                    + ". "
+                    + loc));
       }
     } else {
       // add batch data
@@ -1079,11 +1097,19 @@ public class ShuffleClientImpl extends ShuffleClient {
 
           @Override
           public void onFailure(Throwable e) {
+            StatusCode cause = getPushDataFailCause(e.getMessage());
             if (pushState.exception.get() != null) {
               return;
             }
             if (remainReviveTimes <= 0) {
-              callback.onFailure(e);
+              callback.onFailure(
+                  new Exception(
+                      cause.getMessage()
+                          + "! Push data to master worker of "
+                          + hostPort
+                          + " failed: "
+                          + e.getMessage(),
+                      e));
               return;
             }
             logger.error(
@@ -1109,7 +1135,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                           mapId,
                           attemptId,
                           batches,
-                          getPushDataFailCause(e.getMessage()),
+                          cause,
                           groupedBatchId,
                           remainReviveTimes - 1));
             }
@@ -1123,11 +1149,20 @@ public class ShuffleClientImpl extends ShuffleClient {
         ChannelFuture future = client.pushMergedData(mergedData, wrappedCallback);
         pushState.pushStarted(groupedBatchId, future, wrappedCallback, hostPort);
       } else {
-        throw new RuntimeException("Mock push merge data failed");
+        wrappedCallback.onFailure(
+            new Exception(
+                StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE.toString(),
+                new RuntimeException("Mock push merge data failed.")));
       }
     } catch (Exception e) {
       logger.warn("PushMergedData failed", e);
-      wrappedCallback.onFailure(new Exception(getPushDataFailCause(e.getMessage()).toString(), e));
+      wrappedCallback.onFailure(
+          new Exception(
+              StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage()
+                  + "! "
+                  + e.getMessage()
+                  + ". "
+                  + hostPort));
     }
   }
 
@@ -1382,11 +1417,22 @@ public class ShuffleClientImpl extends ShuffleClient {
     StatusCode cause;
     if (message.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())) {
       cause = StatusCode.PUSH_DATA_FAIL_SLAVE;
-    } else if (message.startsWith(StatusCode.PUSH_DATA_FAIL_MASTER.getMessage())
-        || connectFail(message)) {
+    } else if (message.startsWith(StatusCode.PUSH_DATA_FAIL_MASTER.getMessage())) {
       cause = StatusCode.PUSH_DATA_FAIL_MASTER;
+    } else if (message.startsWith(
+        StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage())) {
+      cause = StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER;
+    } else if (message.startsWith(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage())) {
+      cause = StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE;
+    } else if (message.startsWith(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER.getMessage())) {
+      cause = StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER;
+    } else if (message.startsWith(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE.getMessage())) {
+      cause = StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE;
     } else if (message.startsWith(StatusCode.PUSH_DATA_TIMEOUT.getMessage())) {
       cause = StatusCode.PUSH_DATA_TIMEOUT;
+    } else if (connectFail(message)) {
+      // Throw when push to master worker connection causeException.
+      cause = StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER;
     } else {
       cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE;
     }
@@ -1516,7 +1562,13 @@ public class ShuffleClientImpl extends ShuffleClient {
       pushState.pushStarted(nextBatchId, future, callback, location.hostAndPushPort());
     } catch (Exception e) {
       logger.warn("PushData byteBuf failed", e);
-      callback.onFailure(new Exception(getPushDataFailCause(e.getMessage()).toString(), e));
+      callback.onFailure(
+          new Exception(
+              StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage()
+                  + "! "
+                  + e.getMessage()
+                  + ". "
+                  + location));
     }
     return totalLength;
   }
diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 073b15bb..1f1fda0b 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -533,6 +533,20 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
         case StatusCode.PUSH_DATA_FAIL_SLAVE
             if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
           blacklistPartitionWorker(oldPartition.getPeer, StatusCode.PUSH_DATA_FAIL_SLAVE)
+        case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER)
+        case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(
+            oldPartition.getPeer,
+            StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE)
+        case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER)
+        case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(
+            oldPartition.getPeer,
+            StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE)
         case _ =>
       }
     }
@@ -1071,7 +1085,13 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
         .filter { case (_, entry) =>
           val (statusCode, registerTime) = entry
           statusCode match {
-            case StatusCode.WORKER_SHUTDOWN | StatusCode.NO_AVAILABLE_WORKING_DIR | StatusCode.RESERVE_SLOTS_FAILED
+            case StatusCode.WORKER_SHUTDOWN |
+                StatusCode.NO_AVAILABLE_WORKING_DIR |
+                StatusCode.RESERVE_SLOTS_FAILED |
+                StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER |
+                StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE |
+                StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER |
+                StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
                 if current - registerTime < workerExcludedExpireTimeout =>
               true
             case StatusCode.UNKNOWN_WORKER => true
diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 75068a87..bdea3983 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -68,7 +68,11 @@ public enum StatusCode {
   REGION_FINISH_FAIL_SLAVE(36),
   REGION_FINISH_FAIL_MASTER(37),
 
-  PUSH_DATA_TIMEOUT(38);
+  PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER(38),
+  PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE(39),
+  PUSH_DATA_CONNECTION_EXCEPTION_MASTER(40),
+  PUSH_DATA_CONNECTION_EXCEPTION_SLAVE(41),
+  PUSH_DATA_TIMEOUT(42);
 
   private final byte value;
 
@@ -87,6 +91,14 @@ public enum StatusCode {
       msg = "PushDataFailMaster";
     } else if (value == PUSH_DATA_FAIL_SLAVE.getValue()) {
       msg = "PushDataFailSlave";
+    } else if (value == PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getValue()) {
+      msg = "PushDataCreateConnectionFailMaster";
+    } else if (value == PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getValue()) {
+      msg = "PushDataCreateConnectionFailSlave";
+    } else if (value == PUSH_DATA_CONNECTION_EXCEPTION_MASTER.getValue()) {
+      msg = "PushDataConnectionExceptionMaster";
+    } else if (value == PUSH_DATA_CONNECTION_EXCEPTION_SLAVE.getValue()) {
+      msg = "PushDataConnectionExceptionSlave";
     } else if (value == PUSH_DATA_FAIL_NON_CRITICAL_CAUSE.getValue()) {
       msg = "PushDataFailNonCriticalCause";
     } else if (value == PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getValue()) {
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 875b75fd..31a716dd 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -894,6 +894,14 @@ object Utils extends Logging {
         StatusCode.PUSH_DATA_SUCCESS_MASTER_CONGESTED
       case 31 =>
         StatusCode.PUSH_DATA_SUCCESS_SLAVE_CONGESTED
+      case 38 =>
+        StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER
+      case 39 =>
+        StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
+      case 40 =>
+        StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER
+      case 41 =>
+        StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
       case _ =>
         null
     }
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index cdcb426a..ccc254c5 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -171,8 +171,15 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       override def onFailure(e: Throwable): Unit = {
         logError(s"[handlePushData.onFailure] partitionLocation: $location", e)
         workerSource.incCounter(WorkerSource.PushDataFailCount)
-        callback.onFailure(
-          new Exception(s"${StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()}! ${e.getMessage}", e))
+        // Throw by slave peer worker
+        if (e.getMessage.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage)) {
+          callback.onFailure(e)
+        } else {
+          // Throw by connection
+          callback.onFailure(new Exception(
+            s"${StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE.getMessage}! Push data to peer of $location failed: ${e.getMessage}",
+            e))
+        }
       }
     }
 
@@ -268,8 +275,10 @@ class PushDataHandler extends BaseMessageHandler with Logging {
             peer.getReplicatePort)
           if (unavailablePeers.containsKey(peerWorker)) {
             pushData.body().release()
-            wrappedCallback.onFailure(
-              new Exception(s"Peer $peerWorker unavailable for $location!"))
+            workerSource.incCounter(WorkerSource.PushDataFailCount)
+            callback.onFailure(
+              new Exception(
+                s"${StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage}! Peer $peerWorker unavailable for $location!"))
             return
           }
           try {
@@ -285,8 +294,11 @@ class PushDataHandler extends BaseMessageHandler with Logging {
             case e: Exception =>
               pushData.body().release()
               unavailablePeers.put(peerWorker, System.currentTimeMillis())
-              wrappedCallback.onFailure(
-                new Exception(s"Push data to peer $peerWorker failed for $location", e))
+              workerSource.incCounter(WorkerSource.PushDataFailCount)
+              callback.onFailure(
+                new Exception(
+                  s"${StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage}! Create connection to peer $peerWorker failed for $location",
+                  e))
           }
         }
       })
@@ -334,6 +346,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       return
     }
 
+    val locations = pushMergedData.partitionUniqueIds.map { id =>
+      if (isMaster) {
+        partitionLocationInfo.getMasterLocation(shuffleKey, id)
+      } else {
+        partitionLocationInfo.getSlaveLocation(shuffleKey, id)
+      }
+    }
+
     val wrappedCallback = new RpcResponseCallback() {
       override def onSuccess(response: ByteBuffer): Unit = {
         if (isMaster) {
@@ -354,19 +374,20 @@ class PushDataHandler extends BaseMessageHandler with Logging {
 
       override def onFailure(e: Throwable): Unit = {
         workerSource.incCounter(WorkerSource.PushDataFailCount)
-        callback.onFailure(
-          new Exception(s"${StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage()}! ${e.getMessage}", e))
+        // Throw by slave peer worker
+        if (e.getMessage.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage)) {
+          callback.onFailure(e)
+        } else {
+          // Throw by connection
+          callback.onFailure(new Exception(
+            s"${StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE.getMessage}! Push data to peer of ${locations.head} failed: ${e.getMessage}",
+            e))
+        }
       }
     }
 
     // find FileWriters responsible for the data
-    val locations = pushMergedData.partitionUniqueIds.map { id =>
-      val loc =
-        if (isMaster) {
-          partitionLocationInfo.getMasterLocation(shuffleKey, id)
-        } else {
-          partitionLocationInfo.getSlaveLocation(shuffleKey, id)
-        }
+    locations.foreach { loc =>
       if (loc == null) {
         val (mapId, attemptId) = getMapAttempt(body)
         // MapperAttempts for a shuffle exists after any CommitFiles request succeeds.
@@ -397,7 +418,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
             wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
           } else {
             val msg = s"Partition location wasn't found for task(shuffle $shuffleKey, map $mapId," +
-              s" attempt $attemptId, uniqueId $id)."
+              s" attempt $attemptId, uniqueId ${loc.getUniqueId})."
             logWarning(s"[handlePushMergedData] $msg")
             callback.onFailure(
               new Exception(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getMessage()))
@@ -405,7 +426,6 @@ class PushDataHandler extends BaseMessageHandler with Logging {
         }
         return
       }
-      loc
     }
 
     // During worker shutdown, worker will return HARD_SPLIT for all existed partition.
@@ -447,7 +467,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
             peer.getReplicatePort)
           if (unavailablePeers.containsKey(peerWorker)) {
             pushMergedData.body().release()
-            wrappedCallback.onFailure(new Exception(s"Peer $peerWorker unavailable for $location!"))
+            workerSource.incCounter(WorkerSource.PushDataFailCount)
+            callback.onFailure(new Exception(
+              s"${StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage}! Peer $peerWorker unavailable for $location!"))
             return
           }
           try {
@@ -466,9 +488,11 @@ class PushDataHandler extends BaseMessageHandler with Logging {
             case e: Exception =>
               pushMergedData.body().release()
               unavailablePeers.put(peerWorker, System.currentTimeMillis())
-              wrappedCallback.onFailure(new Exception(
-                s"Push data to peer $peerWorker failed for $location",
-                e))
+              workerSource.incCounter(WorkerSource.PushDataFailCount)
+              callback.onFailure(
+                new Exception(
+                  s"${StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE.getMessage}! Create connection to peer $peerWorker failed for $location",
+                  e))
           }
         }
       })