You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bang Xiao (JIRA)" <ji...@apache.org> on 2018/01/30 04:31:00 UTC
[jira] [Comment Edited] (SPARK-23252) When NodeManager and
CoarseGrainedExecutorBackend processes are killed, the job will be blocked
[ https://issues.apache.org/jira/browse/SPARK-23252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344484#comment-16344484 ]
Bang Xiao edited comment on SPARK-23252 at 1/30/18 4:30 AM:
------------------------------------------------------------
After the executor and NodeManager is killed, failure tasks never relaunched because of reason is not yet known.
{code:java}
CoarseGrainedSchedulerBackend.scala:
protected def disableExecutor(executorId: String): Boolean = {
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
if (executorIsAlive(executorId)) {
executorsPendingLossReason += executorId
true
} else {
// Returns true for explicitly killed executors, we also need to get pending loss reasons;
// For others return false.
executorsPendingToRemove.contains(executorId)
}
}
if (shouldDisable) {
logInfo(s"Disabling executor $executorId.")
scheduler.executorLost(executorId, LossReasonPending)
}
shouldDisable
}{code}
TaskSchedulerImpl will handle executorLost and removeExecutor
{code:java}
TaskSchedulerImpl.scala:
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
// The tasks on the lost executor may not send any more status updates (because the executor
// has been lost), so they should be cleaned up here.
executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
logDebug("Cleaning up TaskScheduler state for tasks " +
s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
// We do not notify the TaskSetManager of the task failures because that will
// happen below in the rootPool.executorLost() call.
taskIds.foreach(cleanupTaskState)
}
val host = executorIdToHost(executorId)
val execs = hostToExecutors.getOrElse(host, new HashSet)
execs -= executorId
if (execs.isEmpty) {
hostToExecutors -= host
for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
hosts -= host
if (hosts.isEmpty) {
hostsByRack -= rack
}
}
}
if (reason != LossReasonPending) {
executorIdToHost -= executorId
rootPool.executorLost(executorId, host, reason)
}
blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId))
}
{code}
but if the reason is LossReasonPending, it will not trigger lost tasks relaunched.
This is consistent with what I've observed from the log.
was (Author: chopinxb):
After the executor and NodeManager is killed, failure tasks never relaunched because of reason is not yet known.
{code:java}
CoarseGrainedSchedulerBackend.scala:
protected def disableExecutor(executorId: String): Boolean = {
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
if (executorIsAlive(executorId)) {
executorsPendingLossReason += executorId
true
} else {
// Returns true for explicitly killed executors, we also need to get pending loss reasons;
// For others return false.
executorsPendingToRemove.contains(executorId)
}
}
if (shouldDisable) {
logInfo(s"Disabling executor $executorId.")
scheduler.executorLost(executorId, LossReasonPending)
}
shouldDisable
}{code}
TaskSchedulerImpl will handle executorLost and removeExecutor
{code:java}
TaskSchedulerImpl.scala:
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
// The tasks on the lost executor may not send any more status updates (because the executor
// has been lost), so they should be cleaned up here.
executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
logDebug("Cleaning up TaskScheduler state for tasks " +
s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
// We do not notify the TaskSetManager of the task failures because that will
// happen below in the rootPool.executorLost() call.
taskIds.foreach(cleanupTaskState)
}
val host = executorIdToHost(executorId)
val execs = hostToExecutors.getOrElse(host, new HashSet)
execs -= executorId
if (execs.isEmpty) {
hostToExecutors -= host
for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
hosts -= host
if (hosts.isEmpty) {
hostsByRack -= rack
}
}
}
if (reason != LossReasonPending) {
executorIdToHost -= executorId
rootPool.executorLost(executorId, host, reason)
}
blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId))
}
{code}
but if the reason is LossReasonPending, it will not trigger lost tasks relaunched.
This is consistent with what I've observed from the log.
> When NodeManager and CoarseGrainedExecutorBackend processes are killed, the job will be blocked
> -----------------------------------------------------------------------------------------------
>
> Key: SPARK-23252
> URL: https://issues.apache.org/jira/browse/SPARK-23252
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.0
> Reporter: Bang Xiao
> Priority: Major
>
> This happens when 'spark.dynamicAllocation.enabled' is set to be 'true'. We use Yarn as our resource manager.
> 1,spark-submit "JavaWordCount" application in yarn-client mode
> 2, Kill NodeManager and CoarseGrainedExecutorBackend processes in one node when the job is in stage 0
> if we just kill all CoarseGrainedExecutorBackend in that node, TaskSetManager will pending the failure task to resubmit. but if the NodeManager and CoarseGrainedExecutorBackend processes killed simultaneously,the whole job will be blocked.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org