You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bang Xiao (JIRA)" <ji...@apache.org> on 2018/01/30 04:31:00 UTC

[jira] [Comment Edited] (SPARK-23252) When NodeManager and CoarseGrainedExecutorBackend processes are killed, the job will be blocked

    [ https://issues.apache.org/jira/browse/SPARK-23252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344484#comment-16344484 ] 

Bang Xiao edited comment on SPARK-23252 at 1/30/18 4:30 AM:
------------------------------------------------------------

After the executor and NodeManager is killed, failure tasks never relaunched because of reason is not yet known.
{code:java}
CoarseGrainedSchedulerBackend.scala:

protected def disableExecutor(executorId: String): Boolean = {
    val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
      if (executorIsAlive(executorId)) {
        executorsPendingLossReason += executorId
        true
      } else {
        // Returns true for explicitly killed executors, we also need to get pending loss reasons;
        // For others return false.
        executorsPendingToRemove.contains(executorId)
      }
    }
    if (shouldDisable) {
      logInfo(s"Disabling executor $executorId.")
      scheduler.executorLost(executorId, LossReasonPending)
    }
    shouldDisable
  }{code}
TaskSchedulerImpl will handle executorLost and removeExecutor
{code:java}
TaskSchedulerImpl.scala:
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
  // The tasks on the lost executor may not send any more status updates (because the executor
  // has been lost), so they should be cleaned up here.
  executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
    logDebug("Cleaning up TaskScheduler state for tasks " +
      s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
    // We do not notify the TaskSetManager of the task failures because that will
    // happen below in the rootPool.executorLost() call.
    taskIds.foreach(cleanupTaskState)
  }

  val host = executorIdToHost(executorId)
  val execs = hostToExecutors.getOrElse(host, new HashSet)
  execs -= executorId
  if (execs.isEmpty) {
    hostToExecutors -= host
    for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
      hosts -= host
      if (hosts.isEmpty) {
        hostsByRack -= rack
      }
    }
  }

  if (reason != LossReasonPending) {
    executorIdToHost -= executorId
    rootPool.executorLost(executorId, host, reason)
  }
  blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId))
}
{code}
but if the reason is LossReasonPending, it will not trigger lost tasks relaunched.

This is consistent with what I've observed from the log.

 


was (Author: chopinxb):
After the executor and NodeManager is killed, failure tasks never relaunched because of reason is not yet known.
{code:java}
CoarseGrainedSchedulerBackend.scala:

protected def disableExecutor(executorId: String): Boolean = {
    val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
      if (executorIsAlive(executorId)) {
        executorsPendingLossReason += executorId
        true
      } else {
        // Returns true for explicitly killed executors, we also need to get pending loss reasons;
        // For others return false.
        executorsPendingToRemove.contains(executorId)
      }
    }
    if (shouldDisable) {
      logInfo(s"Disabling executor $executorId.")
      scheduler.executorLost(executorId, LossReasonPending)
    }
    shouldDisable
  }{code}
TaskSchedulerImpl will handle executorLost and removeExecutor
{code:java}
TaskSchedulerImpl.scala:
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
  // The tasks on the lost executor may not send any more status updates (because the executor
  // has been lost), so they should be cleaned up here.
  executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
    logDebug("Cleaning up TaskScheduler state for tasks " +
      s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
    // We do not notify the TaskSetManager of the task failures because that will
    // happen below in the rootPool.executorLost() call.
    taskIds.foreach(cleanupTaskState)
  }

  val host = executorIdToHost(executorId)
  val execs = hostToExecutors.getOrElse(host, new HashSet)
  execs -= executorId
  if (execs.isEmpty) {
    hostToExecutors -= host
    for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
      hosts -= host
      if (hosts.isEmpty) {
        hostsByRack -= rack
      }
    }
  }

  if (reason != LossReasonPending) {
    executorIdToHost -= executorId
    rootPool.executorLost(executorId, host, reason)
  }
  blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId))
}
{code}
but if the reason is LossReasonPending, it will not trigger lost tasks relaunched.

This is consistent with what I've observed from the log.

 

> When NodeManager and CoarseGrainedExecutorBackend processes are killed, the job will be blocked
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23252
>                 URL: https://issues.apache.org/jira/browse/SPARK-23252
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Bang Xiao
>            Priority: Major
>
> This happens when 'spark.dynamicAllocation.enabled' is set to be 'true'. We use Yarn as our resource manager. 
> 1,spark-submit "JavaWordCount" application in yarn-client mode
> 2,   Kill NodeManager and CoarseGrainedExecutorBackend processes in one node when the job is in stage 0 
> if we just kill all CoarseGrainedExecutorBackend in that node, TaskSetManager will pending the failure task to resubmit. but if the NodeManager and CoarseGrainedExecutorBackend processes killed simultaneously,the whole job will be blocked. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org