You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/04 11:20:47 UTC

[GitHub] [spark] weixiuli opened a new pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

weixiuli opened a new pull request #28994:
URL: https://github.com/apache/spark/pull/28994


   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
    Improve the speculation for the inefficient tasks by the task metrics.
   
   
   ### Why are the changes needed?
   
   1) Tasks will be speculated when meet certain conditions no matter they are inefficient or not,this would be a huge waste of cluster resources.
   2) In production, the speculation task comes  from an efficient one  will be killed finally, which is unnecessary and wastes the cluster resources. Sometimes, it interferes with other task scheduling.
   3) So, we should  evaluate whether the task is inefficient by success tasks metrics firstly,  and then decide to speculate it or not. The  inefficient task should be speculated and the efficient one should not, it is better for the cluster resources.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   
   Add UT.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-653932065


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-653931791


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #28994:
URL: https://github.com/apache/spark/pull/28994


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] weixiuli commented on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
weixiuli commented on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-657384538


   @maropu  @cloud-fan @gatorsmile @mridulm @dongjoon-hyun Could you help check this PR? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
venkata91 commented on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-659869847


   This is an interesting idea and a good start. Just considering the runTime of a task alone might not be useful in many cases.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-655740354


   @venkata91 You might be interested in this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-653931791


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] weixiuli edited a comment on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
weixiuli edited a comment on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-653753745


   @cloud-fan @dongjoon-hyun kindly review, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-716247017


   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] weixiuli commented on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
weixiuli commented on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-653753745


   @cloud-fan kindly review, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #28994:
URL: https://github.com/apache/spark/pull/28994#discussion_r456228668



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1125,6 +1142,78 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private class InefficientTask {
+    private var taskData: Map[Long, TaskData] = null
+    private var successTaskProgress = 0.0
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    if (checkInefficientTask) {
+      val appStatusStore = sched.sc.statusTracker.getAppStatusStore
+      if (appStatusStore != null) {
+        successTaskProgress =
+          computeSuccessTaskProgress(taskSet.stageId, taskSet.stageAttemptId, appStatusStore)
+        val stageData = appStatusStore.stageAttempt(taskSet.stageId, taskSet.stageAttemptId, true)
+        if (stageData != null) {
+          taskData = stageData._1.tasks.orNull
+        }
+      }
+    }
+
+    private def computeSuccessTaskProgress(stageId: Int, stageAttemptId: Int,
+      appStatusStore: AppStatusStore): Double = {
+      var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+      appStatusStore.taskList(stageId, stageAttemptId, Int.MaxValue).filter {
+        _.status == "SUCCESS"
+      }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+        if (task.inputMetrics != null) {
+          sumInputRecords += task.inputMetrics.recordsRead
+        }

Review comment:
       how about recordsWritten? Should that also be considered wrt progress same wrt shuffleRecordsWritten?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1125,6 +1142,78 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private class InefficientTask {
+    private var taskData: Map[Long, TaskData] = null
+    private var successTaskProgress = 0.0
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    if (checkInefficientTask) {
+      val appStatusStore = sched.sc.statusTracker.getAppStatusStore
+      if (appStatusStore != null) {
+        successTaskProgress =
+          computeSuccessTaskProgress(taskSet.stageId, taskSet.stageAttemptId, appStatusStore)
+        val stageData = appStatusStore.stageAttempt(taskSet.stageId, taskSet.stageAttemptId, true)
+        if (stageData != null) {
+          taskData = stageData._1.tasks.orNull
+        }
+      }
+    }
+
+    private def computeSuccessTaskProgress(stageId: Int, stageAttemptId: Int,
+      appStatusStore: AppStatusStore): Double = {
+      var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+      appStatusStore.taskList(stageId, stageAttemptId, Int.MaxValue).filter {
+        _.status == "SUCCESS"
+      }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+        if (task.inputMetrics != null) {
+          sumInputRecords += task.inputMetrics.recordsRead
+        }
+        if (task.shuffleReadMetrics != null) {
+          sumShuffleReadRecords += task.shuffleReadMetrics.recordsRead
+        }
+        sumExecutorRunTime += task.executorRunTime
+      }
+      if (sumExecutorRunTime > 0) {
+        (sumInputRecords + sumShuffleReadRecords) / (sumExecutorRunTime / 1000.0)
+      } else 0
+    }
+
+    def maySpeculateTask(tid: Long, runtimeMs: Long, taskInfo: TaskInfo): Boolean = {
+      // note: 1) only check inefficient tasks when 'SPECULATION_TASK_DURATION_THRESHOLD' > 0.
+      // 2) some tasks may have neither input records nor shuffleRead records, so
+      // the 'successTaskProgress' may be zero all the time, this case we should not consider,
+      // eg: some spark-sql like that 'msck repair table' or 'drop table' and so on.
+      if (!checkInefficientTask || successTaskProgress <= 0) {
+        true
+      } else if (runtimeMs < speculationTaskMinDuration) {
+        false
+      } else if (taskData != null && taskData.contains(tid) && taskData(tid) != null &&
+        taskData(tid).taskMetrics.isDefined) {
+        val taskMetrics = taskData(tid).taskMetrics.get
+        val currentTaskProgressRate = (taskMetrics.inputMetrics.recordsRead +

Review comment:
       would it make sense to add taskProgress as part of taskMetrics that way it can also be shown in SparkUI? Although taskProgress for tasks which doesn't involve input/output/shuffle records would be hard to measure?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1125,6 +1142,78 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private class InefficientTask {
+    private var taskData: Map[Long, TaskData] = null
+    private var successTaskProgress = 0.0
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    if (checkInefficientTask) {
+      val appStatusStore = sched.sc.statusTracker.getAppStatusStore
+      if (appStatusStore != null) {
+        successTaskProgress =
+          computeSuccessTaskProgress(taskSet.stageId, taskSet.stageAttemptId, appStatusStore)
+        val stageData = appStatusStore.stageAttempt(taskSet.stageId, taskSet.stageAttemptId, true)
+        if (stageData != null) {
+          taskData = stageData._1.tasks.orNull
+        }
+      }
+    }
+
+    private def computeSuccessTaskProgress(stageId: Int, stageAttemptId: Int,
+      appStatusStore: AppStatusStore): Double = {
+      var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+      appStatusStore.taskList(stageId, stageAttemptId, Int.MaxValue).filter {
+        _.status == "SUCCESS"
+      }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+        if (task.inputMetrics != null) {
+          sumInputRecords += task.inputMetrics.recordsRead
+        }

Review comment:
       Even cache can also take time when written to disk, does that need to be taken into consideration? Similarly GC time, shuffle read blocked time etc. could also impact task progress




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 edited a comment on pull request #28994: [SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics.

Posted by GitBox <gi...@apache.org>.
venkata91 edited a comment on pull request #28994:
URL: https://github.com/apache/spark/pull/28994#issuecomment-659869847


   This is an interesting idea and a good start. Just considering the runTime of a task alone might not be useful in many cases. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org