You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "AngersZhuuuu (via GitHub)" <gi...@apache.org> on 2023/04/20 09:09:10 UTC

[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1441: [CELEBORN-537] Improve blacklist compute & minor fix for Flink

AngersZhuuuu commented on code in PR #1441:
URL: https://github.com/apache/incubator-celeborn/pull/1441#discussion_r1172293438


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -442,6 +442,9 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
 
     candidatesWorkers.removeAll(connectFailedWorkers.asScala.keys.toList.asJava)
     workerStatusTracker.recordWorkerFailure(connectFailedWorkers)
+    // if newly allocated from master and setupEndpoint success, we can remove worker from blacklist to
+    // improve the accuracy of the blacklist

Review Comment:
   ```
    // If newly allocated workers from the master can setup endpoint success, LifecycleManager should remove these workers from the blacklist to improve the accuracy of the blacklist.
   ```



##########
client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala:
##########
@@ -142,32 +146,35 @@ class WorkerStatusTracker(
                 StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE |
                 StatusCode.PUSH_DATA_TIMEOUT_MASTER |
                 StatusCode.PUSH_DATA_TIMEOUT_SLAVE
-                if current - registerTime < workerExcludedExpireTimeout =>
-              true
-            case StatusCode.UNKNOWN_WORKER => true
-            case _ => false
-          }
-        }.asJava
-      val reservedBlackList = new ShuffleFailedWorkers()
-      reservedBlackList.putAll(reserved)
-      blacklist.clear()
-      blacklist.putAll(
-        res.blacklist.asScala.map(_ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava)
-      blacklist.putAll(
-        res.unknownWorkers.asScala.map(_ -> (StatusCode.UNKNOWN_WORKER -> current)).toMap.asJava)
-      // put reserved blacklist at last to cover blacklist's local status.
-      blacklist.putAll(reservedBlackList)
-
-      val workerStatus = new WorkersStatus(res.unknownWorkers, newShutdownWorkers)
-      workerStatusListeners.asScala.foreach {
-        listener =>
-          try {
-            listener.notifyChangedWorkersStatus(workerStatus)
-          } catch {
-            case t: Throwable =>
-              logError("Error while notify listener", t)
+                if current - registerTime < workerExcludedExpireTimeout => // reserve
+            case _ => blacklist.remove(workerInfo)
           }
       }
+
+      if (!res.blacklist.isEmpty) {
+        blacklist.putAll(res.blacklist.asScala.filterNot(e => blacklist.containsKey(e)).map(
+          _ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava)
+      }
+
+      if (!res.unknownWorkers.isEmpty || !newShutdownWorkers.isEmpty) {
+        blacklist.putAll(res.unknownWorkers.asScala.filterNot(e => blacklist.containsKey(e)).map(
+          _ -> (StatusCode.UNKNOWN_WORKER -> current)).toMap.asJava)
+        blacklist.putAll(res.shuttingWorkers.asScala.filterNot(e => blacklist.containsKey(e)).map(
+          _ -> (StatusCode.WORKER_SHUTDOWN -> current)).toMap.asJava)
+
+        val workerStatus = new WorkersStatus(res.unknownWorkers, newShutdownWorkers)
+        workerStatusListeners.asScala.foreach {
+          listener =>
+            try {
+              listener.notifyChangedWorkersStatus(workerStatus)
+            } catch {
+              case t: Throwable =>
+                logError("Error while notify listener", t)
+            }
+        }

Review Comment:
   ```
   workerStatusListeners.asScala.foreach { listener =>
           try {
             listener.notifyChangedWorkersStatus(workerStatus)
           } catch {
               case t: Throwable =>
                   logError("Error while notify listener", t)
           }
           }
   ```



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -442,6 +442,9 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
 
     candidatesWorkers.removeAll(connectFailedWorkers.asScala.keys.toList.asJava)
     workerStatusTracker.recordWorkerFailure(connectFailedWorkers)
+    // if newly allocated from master and setupEndpoint success, we can remove worker from blacklist to
+    // improve the accuracy of the blacklist
+    workerStatusTracker.removeFromBlacklist(candidatesWorkers)

Review Comment:
   LGTM,  I also plan to do this too.



##########
client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala:
##########
@@ -123,17 +123,21 @@ class WorkerStatusTracker(
     }
   }
 
+  def removeFromBlacklist(workers: JHashSet[WorkerInfo]): Unit = {
+    blacklist.keySet.removeAll(workers)

Review Comment:
   Why use keySet here?



##########
client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala:
##########
@@ -142,32 +146,35 @@ class WorkerStatusTracker(
                 StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE |
                 StatusCode.PUSH_DATA_TIMEOUT_MASTER |
                 StatusCode.PUSH_DATA_TIMEOUT_SLAVE
-                if current - registerTime < workerExcludedExpireTimeout =>
-              true
-            case StatusCode.UNKNOWN_WORKER => true
-            case _ => false
-          }
-        }.asJava
-      val reservedBlackList = new ShuffleFailedWorkers()
-      reservedBlackList.putAll(reserved)
-      blacklist.clear()
-      blacklist.putAll(
-        res.blacklist.asScala.map(_ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava)
-      blacklist.putAll(
-        res.unknownWorkers.asScala.map(_ -> (StatusCode.UNKNOWN_WORKER -> current)).toMap.asJava)
-      // put reserved blacklist at last to cover blacklist's local status.
-      blacklist.putAll(reservedBlackList)
-
-      val workerStatus = new WorkersStatus(res.unknownWorkers, newShutdownWorkers)
-      workerStatusListeners.asScala.foreach {
-        listener =>
-          try {
-            listener.notifyChangedWorkersStatus(workerStatus)
-          } catch {
-            case t: Throwable =>
-              logError("Error while notify listener", t)
+                if current - registerTime < workerExcludedExpireTimeout => // reserve
+            case _ => blacklist.remove(workerInfo)
           }
       }
+
+      if (!res.blacklist.isEmpty) {
+        blacklist.putAll(res.blacklist.asScala.filterNot(e => blacklist.containsKey(e)).map(
+          _ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava)
+      }
+
+      if (!res.unknownWorkers.isEmpty || !newShutdownWorkers.isEmpty) {
+        blacklist.putAll(res.unknownWorkers.asScala.filterNot(e => blacklist.containsKey(e)).map(
+          _ -> (StatusCode.UNKNOWN_WORKER -> current)).toMap.asJava)
+        blacklist.putAll(res.shuttingWorkers.asScala.filterNot(e => blacklist.containsKey(e)).map(
+          _ -> (StatusCode.WORKER_SHUTDOWN -> current)).toMap.asJava)

Review Comment:
   Better 
   ```
        blacklist.putAll(res.unknownWorkers.asScala.filterNot(blacklist.containsKey)
           .map(_ -> (StatusCode.UNKNOWN_WORKER -> current)).toMap.asJava)
         blacklist.putAll(res.shuttingWorkers.asScala.filterNot(blacklist.containsKey)
           .map(_ -> (StatusCode.WORKER_SHUTDOWN -> current)).toMap.asJava)
   ```



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -502,6 +505,12 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
       return
     }
 
+    if (getPartitionType(shuffleId) == PartitionType.MAP) {
+      logError(s"[handleRevive] shuffle $shuffleId revived filed, because map partition don't support revive!")

Review Comment:
   We really log an error message here? A comment is ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org