You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zhen Fan (JIRA)" <ji...@apache.org> on 2019/04/01 01:15:00 UTC

[jira] [Updated] (SPARK-27082) Dynamic Allocation: we should consider the scenario that speculative task being killed and never resubmit

     [ https://issues.apache.org/jira/browse/SPARK-27082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zhen Fan updated SPARK-27082:
-----------------------------
    Description: 
Issue background:

When we enable dynamic allocation, we expect that the executors can be removed appropriately, especially in some stages with data skew. With speculation enabled, the copying task  can be killed by the original task and vice versa. In TaskSetManager, we set successful(index)=true, and never resubmit the killed tasks. However, in ExecutorAllocationManager which is very related to the dynamic allocation function, doesn't handle this scenario.

See SPARK-8366. However, (SPARK-8366) ignores one scenario that copying task is being killed. When this happens, the TaskSetManager will mark the task index of the stage as success and never resubmit the killed task, so here we shouldn't treat it as pending task.

This can do harm to the computing of  maxNumExecutorsNeeded, as a result, we always retain unnecessary  executors and waste the computing resources of clusters.

Solution:

When the task index is marked as speculative and the mirror task is successful, we won't treat it as pending task. 

Code has been tested.
{code:java}
private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashMap[Int, Boolean]]
... 
val speculativeTaskIndices = stageIdToSpeculativeTaskIndices.get(stageId)
if (taskEnd.reason == Success) {
  if (speculativeTaskIndices.isDefined && speculativeTaskIndices.get.contains(taskIndex)) {
    speculativeTaskIndices.get(taskIndex) = true
  }
} else {
  var resubmitTask = true
  if (taskEnd.taskInfo.killed) {
    resubmitTask = !(speculativeTaskIndices.isDefined &&
        speculativeTaskIndices.get.getOrElse(taskIndex, false))
  }

  if (resubmitTask) {
    if (totalPendingTasks() == 0) {
      allocationManager.onSchedulerBacklogged()
    }
    if (taskEnd.taskInfo.speculative) {
      stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
    } else {
      stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
    }
  }
}{code}
 Please take a look, Thanks.

  was:
Issue background:

When we enable dynamic allocation, we expect that the executors can be removed appropriately, especially in some stages with data skew. With speculation enabled, the copying task  can be killed by the original task and vice versa. In TaskSetManager, we set successful(index)=true, and never resubmit the killed tasks. However, in ExecutorAllocationManager which is very related to the dynamic allocation function, doesn't handle this scenario.

See SPARK-8366. However, (SPARK-8366) ignores one scenario that copying task is being killed. When this happens, the TaskSetManager will mark the task index of the stage as success and never resubmit the killed task, so here we shouldn't treat it as pending task.

This can do harm to the computing of  maxNumExecutorsNeeded, as a result, we always retain unnecessary  executors and waste the computing resources of clusters.

Solution:

When the task index is marked as speculative and the mirror task is successful, we won't treat it as pending task. 

Code has been tested.
{code:java}
private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashMap[Int, Boolean]]
... 
val speculativeTaskIndices = stageIdToSpeculativeTaskIndices.get(stageId)
if (taskEnd.reason == Success) {
  if (speculativeTaskIndices.isDefined && speculativeTaskIndices.get.contains(taskIndex)) {
    speculativeTaskIndices.get(taskIndex) = true
  }
} else {
  var resubmitTask = true
  if (taskEnd.taskInfo.killed) {
    resubmitTask = !(speculativeTaskIndices.isDefined &&
        speculativeTaskIndices.get.getOrElse(taskIndex, false))
  }

  if (resubmitTask) {
    if (totalPendingTasks() == 0) {
      allocationManager.onSchedulerBacklogged()
    }
    if (taskEnd.taskInfo.speculative) {
      stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
    } else {
      stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
    }
  }
}{code}
 Please take a look, Thanks.

[~cloud_fan] [~LI,Xiao]


> Dynamic Allocation: we should consider the scenario that speculative task being killed and never resubmit
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27082
>                 URL: https://issues.apache.org/jira/browse/SPARK-27082
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.0
>            Reporter: Zhen Fan
>            Priority: Major
>              Labels: patch
>
> Issue background:
> When we enable dynamic allocation, we expect that the executors can be removed appropriately, especially in some stages with data skew. With speculation enabled, the copying task  can be killed by the original task and vice versa. In TaskSetManager, we set successful(index)=true, and never resubmit the killed tasks. However, in ExecutorAllocationManager which is very related to the dynamic allocation function, doesn't handle this scenario.
> See SPARK-8366. However, (SPARK-8366) ignores one scenario that copying task is being killed. When this happens, the TaskSetManager will mark the task index of the stage as success and never resubmit the killed task, so here we shouldn't treat it as pending task.
> This can do harm to the computing of  maxNumExecutorsNeeded, as a result, we always retain unnecessary  executors and waste the computing resources of clusters.
> Solution:
> When the task index is marked as speculative and the mirror task is successful, we won't treat it as pending task. 
> Code has been tested.
> {code:java}
> private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashMap[Int, Boolean]]
> ... 
> val speculativeTaskIndices = stageIdToSpeculativeTaskIndices.get(stageId)
> if (taskEnd.reason == Success) {
>   if (speculativeTaskIndices.isDefined && speculativeTaskIndices.get.contains(taskIndex)) {
>     speculativeTaskIndices.get(taskIndex) = true
>   }
> } else {
>   var resubmitTask = true
>   if (taskEnd.taskInfo.killed) {
>     resubmitTask = !(speculativeTaskIndices.isDefined &&
>         speculativeTaskIndices.get.getOrElse(taskIndex, false))
>   }
>   if (resubmitTask) {
>     if (totalPendingTasks() == 0) {
>       allocationManager.onSchedulerBacklogged()
>     }
>     if (taskEnd.taskInfo.speculative) {
>       stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
>     } else {
>       stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
>     }
>   }
> }{code}
>  Please take a look, Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org