You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "AngersZhuuuu (via GitHub)" <gi...@apache.org> on 2023/04/04 04:08:30 UTC

[GitHub] [incubator-celeborn] AngersZhuuuu opened a new pull request, #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

AngersZhuuuu opened a new pull request, #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408

   ### What changes were proposed in this pull request?
   Merge GetBlacklistResponse to HeartbeatFromApplication
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1495407149

   @RexXiong @waitinfuture 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1161582806


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(
+    conf: CelebornConf,
+    lifecycleManager: LifecycleManager,
+    commitManager: CommitManager) extends Logging {

Review Comment:
   It's strange to add CommitManager in LifecycleBlacklistManager. Seems your previous discussion will make the code cleaner, let's do it later.



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(
+    conf: CelebornConf,
+    lifecycleManager: LifecycleManager,
+    commitManager: CommitManager) extends Logging {
+  private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+
+  // blacklist
+  val blacklist = new ShuffleFailedWorkers()
+  private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
+
+  def blacklistPartition(
+      shuffleId: Int,
+      oldPartition: PartitionLocation,
+      cause: StatusCode): Unit = {
+    val failedWorker = new ShuffleFailedWorkers()
+
+    def blacklistPartitionWorker(
+        partition: PartitionLocation,
+        statusCode: StatusCode): Unit = {
+      val tmpWorker = partition.getWorker
+      val worker =
+        lifecycleManager.workerSnapshots(shuffleId).keySet().asScala.find(_.equals(tmpWorker))
+      if (worker.isDefined) {
+        failedWorker.put(worker.get, (statusCode, System.currentTimeMillis()))
+      }
+    }
+
+    if (oldPartition != null) {
+      cause match {
+        case StatusCode.PUSH_DATA_WRITE_FAIL_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_WRITE_FAIL_MASTER)
+        case StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(oldPartition.getPeer, StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE)
+        case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER)
+        case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(
+            oldPartition.getPeer,
+            StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE)
+        case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER)
+        case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(
+            oldPartition.getPeer,
+            StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE)
+        case StatusCode.PUSH_DATA_TIMEOUT_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_TIMEOUT_MASTER)
+        case StatusCode.PUSH_DATA_TIMEOUT_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(
+            oldPartition.getPeer,
+            StatusCode.PUSH_DATA_TIMEOUT_SLAVE)
+        case _ =>
+      }
+    }
+    if (!failedWorker.isEmpty) {
+      recordWorkerFailure(failedWorker)
+    }
+  }
+
+  def recordWorkerFailure(failures: ShuffleFailedWorkers): Unit = {
+    val failedWorker = new ShuffleFailedWorkers(failures)
+    logInfo(s"Report Worker Failure: ${failedWorker.asScala}, current blacklist $blacklist")
+    failedWorker.asScala.foreach { case (worker, (statusCode, registerTime)) =>
+      if (!blacklist.containsKey(worker)) {
+        blacklist.put(worker, (statusCode, registerTime))
+      } else {
+        statusCode match {
+          case StatusCode.WORKER_SHUTDOWN |
+              StatusCode.NO_AVAILABLE_WORKING_DIR |
+              StatusCode.RESERVE_SLOTS_FAILED |
+              StatusCode.UNKNOWN_WORKER =>
+            blacklist.put(worker, (statusCode, blacklist.get(worker)._2))
+          case _ => // Not cover
+        }
+      }
+    }
+  }
+
+  def handleHeartbeatResponse(res: HeartbeatFromApplicationResponse): Unit = {
+    if (res.statusCode == StatusCode.SUCCESS) {
+      logInfo(s"Received Blacklist from Master, blacklist: ${res.blacklist} " +
+        s"unknown workers: ${res.unknownWorkers}, shutdown workers: ${res.shutdownWorkers}")
+      resolveShutdownWorkers(res.shutdownWorkers)

Review Comment:
   Shall we rename shutdownWorkers in HeartbeatFromApplicationResponse to shuttingWorkers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1502926824

   > > @waitinfuture @RexXiong How about current? I am wondering if I should remove `shuttingWorkers` in GetBlacklistResponse...
   > 
   > I agree to remove it directly, as we do not need to maintain compatibility during major version development.
   
   Done
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1162360692


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1162339644


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(
+    conf: CelebornConf,
+    lifecycleManager: LifecycleManager,
+    commitManager: CommitManager) extends Logging {
+  private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+
+  // blacklist
+  val blacklist = new ShuffleFailedWorkers()
+  private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
+
+  def blacklistPartition(

Review Comment:
   prefer blacklistWorkerFromPartition



##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -39,8 +39,8 @@ enum MessageType {
   APPLICATION_LOST = 18;
   APPLICATION_LOST_RESPONSE = 19;
   HEARTBEAT_FROM_APPLICATION = 20;
-  GET_BLACKLIST = 21;

Review Comment:
   agree with @waitinfuture, we would better keep this



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(
+    conf: CelebornConf,
+    lifecycleManager: LifecycleManager,
+    commitManager: CommitManager) extends Logging {
+  private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+
+  // blacklist
+  val blacklist = new ShuffleFailedWorkers()
+  private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
+
+  def blacklistPartition(
+      shuffleId: Int,
+      oldPartition: PartitionLocation,
+      cause: StatusCode): Unit = {
+    val failedWorker = new ShuffleFailedWorkers()
+
+    def blacklistPartitionWorker(

Review Comment:
   blacklistWorker(worker, statusCode)



##########
client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala:
##########
@@ -48,9 +51,17 @@ class ApplicationHeartbeater(
             val (tmpTotalWritten, tmpFileCount) = shuffleMetrics()
             logDebug(s"Send app heartbeat with $tmpTotalWritten $tmpFileCount")
             val appHeartbeat =
-              HeartbeatFromApplication(appId, tmpTotalWritten, tmpFileCount, ZERO_UUID)
-            rssHARetryClient.send(appHeartbeat)
-            logDebug("Successfully send app heartbeat.")
+              HeartbeatFromApplication(
+                appId,
+                tmpTotalWritten,
+                tmpFileCount,
+                blacklistManager.blacklist.asScala.keys.toList.asJava,
+                ZERO_UUID)
+            val response = requestHeartbeat(appHeartbeat)
+            if (response.statusCode == StatusCode.SUCCESS) {
+              logDebug("Successfully send app heartbeat.")
+            }
+            blacklistManager.handleHeartbeatResponse(response)

Review Comment:
   when repsonse.statusCode is not Success? Would be better to remove this line to if branch.



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(

Review Comment:
   WorkerStatusTracker? as any worker status change will inform this tracker, (Currently blacklistManager responsible for blacklist/ workerstatus from heartbeat. It will also have the duty to manage later worker listeners



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1502799850

   @waitinfuture @RexXiong How about current? I am wondering if I should remove `shuttingWorkers` in GetBlacklistResponse...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1500101198

   > Agree with the merge Idea, and shell we consider introducing a workerListener interface to monitor worker status changes, and then we can do something. In [CELEBORN-479](https://github.com/apache/incubator-celeborn/pull/1405) I need register a worker listener to monitor unavailable workers as i cannot pass resourceTracker(belong to Flink plugin) to lifecycleManagers. If that we can have shutdownWorkerListener for CommitManager, UnknownWorkerListener for Flink, workerFailureListener for blackList etc. What do you think?
   
   SGTM, we can add this listener later, it will make the code more clear
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1163684476


##########
client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}

Review Comment:
   Removed
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu merged pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu merged PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1162368056


##########
client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala:
##########
@@ -48,9 +51,17 @@ class ApplicationHeartbeater(
             val (tmpTotalWritten, tmpFileCount) = shuffleMetrics()
             logDebug(s"Send app heartbeat with $tmpTotalWritten $tmpFileCount")
             val appHeartbeat =
-              HeartbeatFromApplication(appId, tmpTotalWritten, tmpFileCount, ZERO_UUID)
-            rssHARetryClient.send(appHeartbeat)
-            logDebug("Successfully send app heartbeat.")
+              HeartbeatFromApplication(
+                appId,
+                tmpTotalWritten,
+                tmpFileCount,
+                blacklistManager.blacklist.asScala.keys.toList.asJava,
+                ZERO_UUID)
+            val response = requestHeartbeat(appHeartbeat)
+            if (response.statusCode == StatusCode.SUCCESS) {
+              logDebug("Successfully send app heartbeat.")
+            }
+            blacklistManager.handleHeartbeatResponse(response)

Review Comment:
   Done



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(
+    conf: CelebornConf,
+    lifecycleManager: LifecycleManager,
+    commitManager: CommitManager) extends Logging {
+  private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+
+  // blacklist
+  val blacklist = new ShuffleFailedWorkers()
+  private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
+
+  def blacklistPartition(
+      shuffleId: Int,
+      oldPartition: PartitionLocation,
+      cause: StatusCode): Unit = {
+    val failedWorker = new ShuffleFailedWorkers()
+
+    def blacklistPartitionWorker(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1502885957

   > @waitinfuture @RexXiong How about current? I am wondering if I should remove `shuttingWorkers` in GetBlacklistResponse...
   
   I agree to remove it directly, as we do not need to maintain compatibility during major version development.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1162393504


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(
+    conf: CelebornConf,
+    lifecycleManager: LifecycleManager,
+    commitManager: CommitManager) extends Logging {
+  private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+
+  // blacklist
+  val blacklist = new ShuffleFailedWorkers()
+  private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
+
+  def blacklistPartition(
+      shuffleId: Int,
+      oldPartition: PartitionLocation,
+      cause: StatusCode): Unit = {
+    val failedWorker = new ShuffleFailedWorkers()
+
+    def blacklistPartitionWorker(
+        partition: PartitionLocation,
+        statusCode: StatusCode): Unit = {
+      val tmpWorker = partition.getWorker
+      val worker =
+        lifecycleManager.workerSnapshots(shuffleId).keySet().asScala.find(_.equals(tmpWorker))
+      if (worker.isDefined) {
+        failedWorker.put(worker.get, (statusCode, System.currentTimeMillis()))
+      }
+    }
+
+    if (oldPartition != null) {
+      cause match {
+        case StatusCode.PUSH_DATA_WRITE_FAIL_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_WRITE_FAIL_MASTER)
+        case StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(oldPartition.getPeer, StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE)
+        case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER)
+        case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(
+            oldPartition.getPeer,
+            StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE)
+        case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER)
+        case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(
+            oldPartition.getPeer,
+            StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE)
+        case StatusCode.PUSH_DATA_TIMEOUT_MASTER =>
+          blacklistPartitionWorker(oldPartition, StatusCode.PUSH_DATA_TIMEOUT_MASTER)
+        case StatusCode.PUSH_DATA_TIMEOUT_SLAVE
+            if oldPartition.getPeer != null && conf.blacklistSlaveEnabled =>
+          blacklistPartitionWorker(
+            oldPartition.getPeer,
+            StatusCode.PUSH_DATA_TIMEOUT_SLAVE)
+        case _ =>
+      }
+    }
+    if (!failedWorker.isEmpty) {
+      recordWorkerFailure(failedWorker)
+    }
+  }
+
+  def recordWorkerFailure(failures: ShuffleFailedWorkers): Unit = {
+    val failedWorker = new ShuffleFailedWorkers(failures)
+    logInfo(s"Report Worker Failure: ${failedWorker.asScala}, current blacklist $blacklist")
+    failedWorker.asScala.foreach { case (worker, (statusCode, registerTime)) =>
+      if (!blacklist.containsKey(worker)) {
+        blacklist.put(worker, (statusCode, registerTime))
+      } else {
+        statusCode match {
+          case StatusCode.WORKER_SHUTDOWN |
+              StatusCode.NO_AVAILABLE_WORKING_DIR |
+              StatusCode.RESERVE_SLOTS_FAILED |
+              StatusCode.UNKNOWN_WORKER =>
+            blacklist.put(worker, (statusCode, blacklist.get(worker)._2))
+          case _ => // Not cover
+        }
+      }
+    }
+  }
+
+  def handleHeartbeatResponse(res: HeartbeatFromApplicationResponse): Unit = {
+    if (res.statusCode == StatusCode.SUCCESS) {
+      logInfo(s"Received Blacklist from Master, blacklist: ${res.blacklist} " +
+        s"unknown workers: ${res.unknownWorkers}, shutdown workers: ${res.shutdownWorkers}")
+      resolveShutdownWorkers(res.shutdownWorkers)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] codecov[bot] commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1502780343

   ## [Codecov](https://codecov.io/gh/apache/incubator-celeborn/pull/1408?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1408](https://codecov.io/gh/apache/incubator-celeborn/pull/1408?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e327832) into [main](https://codecov.io/gh/apache/incubator-celeborn/commit/c3e8189e629ec6b125f663210dc3a0c41c5b3641?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c3e8189) will **decrease** coverage by `0.09%`.
   > The diff coverage is `0.00%`.
   
   > :exclamation: Current head e327832 differs from pull request most recent head 3bcc5d8. Consider uploading reports for the commit 3bcc5d8 to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##             main    #1408      +/-   ##
   ==========================================
   - Coverage   45.01%   44.91%   -0.09%     
   ==========================================
     Files         164      164              
     Lines       10404    10379      -25     
     Branches     1057     1050       -7     
   ==========================================
   - Hits         4682     4661      -21     
   + Misses       5381     5378       -3     
   + Partials      341      340       -1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-celeborn/pull/1408?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...cala/org/apache/celeborn/common/CelebornConf.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1408?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL0NlbGVib3JuQ29uZi5zY2FsYQ==) | `86.28% <ø> (-0.01%)` | :arrow_down: |
   | [...che/celeborn/common/metrics/source/RPCSource.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1408?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL21ldHJpY3Mvc291cmNlL1JQQ1NvdXJjZS5zY2FsYQ==) | `59.78% <ø> (+0.43%)` | :arrow_up: |
   | [...born/common/protocol/message/ControlMessages.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1408?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3Byb3RvY29sL21lc3NhZ2UvQ29udHJvbE1lc3NhZ2VzLnNjYWxh) | `1.59% <0.00%> (+0.02%)` | :arrow_up: |
   
   ... and [6 files with indirect coverage changes](https://codecov.io/gh/apache/incubator-celeborn/pull/1408/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1501442485

   > ping @RexXiong @waitinfuture
   
   I will review this PR late today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1162396466


##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -39,8 +39,8 @@ enum MessageType {
   APPLICATION_LOST = 18;
   APPLICATION_LOST_RESPONSE = 19;
   HEARTBEAT_FROM_APPLICATION = 20;
-  GET_BLACKLIST = 21;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1162360058


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(
+    conf: CelebornConf,
+    lifecycleManager: LifecycleManager,
+    commitManager: CommitManager) extends Logging {
+  private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+
+  // blacklist
+  val blacklist = new ShuffleFailedWorkers()
+  private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
+
+  def blacklistPartition(

Review Comment:
   Done



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(
+    conf: CelebornConf,
+    lifecycleManager: LifecycleManager,
+    commitManager: CommitManager) extends Logging {
+  private val workerExcludedExpireTimeout = conf.workerExcludedExpireTimeout
+
+  // blacklist
+  val blacklist = new ShuffleFailedWorkers()
+  private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
+
+  def blacklistPartition(
+      shuffleId: Int,
+      oldPartition: PartitionLocation,
+      cause: StatusCode): Unit = {
+    val failedWorker = new ShuffleFailedWorkers()
+
+    def blacklistPartitionWorker(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1162388906


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleBlacklistManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.message.ControlMessages.HeartbeatFromApplicationResponse
+import org.apache.celeborn.common.protocol.message.StatusCode
+
+class LifecycleBlacklistManager(
+    conf: CelebornConf,
+    lifecycleManager: LifecycleManager,
+    commitManager: CommitManager) extends Logging {

Review Comment:
   > It's strange to add CommitManager in LifecycleBlacklistManager. Seems your previous discussion will make the code cleaner, let's do it later.
   
   How about the current code? And later we will implement a listener in each component, the we don't need to involve each other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#discussion_r1163566383


##########
client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}

Review Comment:
   nit: function, JList is not used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1504461254

   Any more suggestion? ping @waitinfuture @RexXiong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1501367376

   ping @RexXiong @waitinfuture 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1501757703

   I'm afraid this PR will cause back compatibility issues. For example old client invoke ```rssHARetryClient.askSync[GetBlacklistResponse](message, classOf[GetBlacklistResponse])```, but server don't know ```GetBlacklist```.
   How about retaining GetBlacklist/GetBlacklistResponse message, and add new HeartBeat/HeartBeatResponse messages?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on pull request #1408: [CELEBORN-502][REFACTOR] Merge GetBlacklistResponse to HeartbeatFromApplication

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on PR #1408:
URL: https://github.com/apache/incubator-celeborn/pull/1408#issuecomment-1499880253

   Agree with the merge Idea, and shell we consider introducing a workerListener interface to monitor worker status changes, and then we can do something. In [CELEBORN-479](https://github.com/apache/incubator-celeborn/pull/1405) I need register a worker listener to monitor unavailable workers as i cannot pass resourceTracker(belong to Flink plugin) to lifecycleManagers. If that we can have shutdownWorkerListener for CommitManager, UnknownWorkerListener for Flink, workerFailureListener for blackList etc.  What do you think?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org