You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xuesen Liang (Jira)" <ji...@apache.org> on 2020/01/02 07:43:00 UTC

[jira] [Commented] (SPARK-30404) Fix wrong log for FetchFailed task's successful speculation

    [ https://issues.apache.org/jira/browse/SPARK-30404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006640#comment-17006640 ] 

Xuesen Liang commented on SPARK-30404:
--------------------------------------

 The following is spark driver log of the fixed version:

 
{code:java}
20/01/02 15:09:17 INFO SparkContext: Starting job: main at NativeMethodAccessorImpl.java:0
20/01/02 15:09:17 INFO DAGScheduler: Registering RDD 2 (main at NativeMethodAccessorImpl.java:0)
20/01/02 15:09:17 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions
20/01/02 15:09:17 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0)
20/01/02 15:09:17 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
20/01/02 15:09:17 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
20/01/02 15:09:17 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
20/01/02 15:09:18 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
20/01/02 15:09:18 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks
20/01/02 15:09:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 100.76.30.139, executor 6, partition 0, PROCESS_LOCAL, 7704 bytes)
20/01/02 15:09:18 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.10.22.124, executor 1, partition 1, PROCESS_LOCAL, 7705 bytes)
20/01/02 15:09:50 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32602 ms on 100.76.30.139 (executor 6) (1/2)
20/01/02 15:09:58 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40533 ms on 9.10.22.124 (executor 1) (2/2)
20/01/02 15:09:58 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/01/02 15:09:58 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.937 s
20/01/02 15:09:58 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
20/01/02 15:09:58 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
20/01/02 15:09:58 INFO YarnClusterScheduler: Adding task set 1.0 with 2 tasks
20/01/02 15:09:58 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 100.76.30.139, executor 6, partition 0, NODE_LOCAL, 7929 bytes)
20/01/02 15:09:58 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 9.10.22.124, executor 1, partition 1, NODE_LOCAL, 7929 bytes)
20/01/02 15:09:58 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 115 ms on 100.76.30.139 (executor 6) (1/2)
20/01/02 15:09:59 INFO TaskSetManager: Marking task 1 in stage 1.0 (on 9.10.22.124) as speculatable because it ran more than 230 ms
20/01/02 15:09:59 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 4, 9.10.133.232, executor 2, partition 1, ANY, 7929 bytes)
20/01/02 15:09:59 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 9.10.133.232, executor 2): FetchFailed(BlockManagerId(6, 100.76.30.139, 7337, None), shuffleId=0, mapId=0, reduceId=1, message=Connection reset by peer)
20/01/02 15:09:59 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
20/01/02 15:09:59 INFO DAGScheduler: Marking ResultStage 1 (main at NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0)
20/01/02 15:09:59 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) failed in 0.279 s due to org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
20/01/02 15:09:59 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at NativeMethodAccessorImpl.java:0) due to fetch failure
20/01/02 15:09:59 INFO DAGScheduler: Resubmitting failed stages
20/01/02 15:09:59 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
20/01/02 15:09:59 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
20/01/02 15:09:59 INFO YarnClusterScheduler: Adding task set 0.1 with 1 tasks
20/01/02 15:09:59 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID 5, 100.76.32.68, executor 4, partition 0, PROCESS_LOCAL, 7704 bytes)




20/01/02 15:10:08 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 10096 ms on 9.10.22.124 (executor 1) (2/2)




20/01/02 15:10:08 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
20/01/02 15:10:08 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) finished in 10.116 s
20/01/02 15:10:08 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 51.113884 s
20/01/02 15:10:31 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID 5) in 32581 ms on 100.76.32.68 (executor 4) (1/1)
20/01/02 15:10:31 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose tasks have all completed, from pool 
{code}

> Fix wrong log for FetchFailed task's successful speculation
> -----------------------------------------------------------
>
>                 Key: SPARK-30404
>                 URL: https://issues.apache.org/jira/browse/SPARK-30404
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.4
>            Reporter: Xuesen Liang
>            Priority: Minor
>
> Steps to reproduce the bug:
> 1. Spark speculation is enabled.
> 2. For a running task {{X.0}}, a speculative task {{X.1}} is launched.
> 3. Then this speculative task {{X.1}} failed by FetchFailedException, and {{successful(index)}} is set to true
> {code:scala}
>   def TaskSetManager#handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) {
>     ...
>     val failureException: Option[Throwable] = reason match {
>       case fetchFailed: FetchFailed =>
>         logWarning(failureReason)
>         if (!successful(index)) {
>           successful(index) = true
>           tasksSuccessful += 1
>         }
>     ...
> {code}
> 4. When the origin running task {{X.0}} finished successfully, it found {{successful(index) == true}}, then output log: {{Ignoring task-finished event ....}}
> {code:scala}
>   def TaskSetManager#handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
>     ...
>     if (!successful(index)) {
>       tasksSuccessful += 1
>       logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +
>         s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +
>         s" ($tasksSuccessful/$numTasks)")
>       // Mark successful and stop if all the tasks have succeeded.
>       successful(index) = true
>       if (tasksSuccessful == numTasks) {
>         isZombie = true
>       }
>     } else {
>       logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
>         " because task " + index + " has already completed successfully")
>     }
>     ...
> {code}
> But task {{X.0}} should output log: \{{Finished task X.0 in stage .... }}
>   
>  
> Relevant spark driver logs as follows:
> {code:scala}
> 20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions
> 20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0)
> 20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
> 20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
> 20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
> 20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
> 20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks
> 20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
> 20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes)
> 20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32491 ms on 9.179.143.4 (executor 1) (1/2)
> 20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40544 ms on 9.76.13.26 (executor 2) (2/2)
> 20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.854 s
> 20/01/02 11:22:26 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
> 20/01/02 11:22:26 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
> 20/01/02 11:22:26 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
> 20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 1.0 with 2 tasks
> 20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 9.179.143.4, executor 1, partition 0, NODE_LOCAL, 7929 bytes)
> 20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 9.76.13.26, executor 2, partition 1, NODE_LOCAL, 7929 bytes)
> 20/01/02 11:22:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 79 ms on 9.179.143.4 (executor 1) (1/2)
> 20/01/02 11:22:26 INFO TaskSetManager: Marking task 1 in stage 1.0 (on 9.76.13.26) as speculatable because it ran more than 158 ms
> 20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3, partition 1, ANY, 7929 bytes)
> 20/01/02 11:22:26 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3): FetchFailed(BlockManagerId(1, 9.179.143.4, 7337, None), shuffleId=0, mapId=0, reduceId=1, message=org.apache.spark.shuffle.FetchFailedException: Connection reset by peer)
> 20/01/02 11:22:26 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
> 20/01/02 11:22:26 INFO DAGScheduler: Marking ResultStage 1 (main at NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0)
> 20/01/02 11:22:26 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) failed in 0.261 s due to org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
> 20/01/02 11:22:26 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at NativeMethodAccessorImpl.java:0) due to fetch failure
> 20/01/02 11:22:26 INFO DAGScheduler: Resubmitting failed stages
> 20/01/02 11:22:26 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
> 20/01/02 11:22:26 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
> 20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 0.1 with 1 tasks
> 20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID 5, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
> // NOTE: Here should be "INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 10000 ms on 9.76.13.26 (executor 2) (2/2)"
> 20/01/02 11:22:36 INFO TaskSetManager: Ignoring task-finished event for 1.0 in stage 1.0 because task 1 has already completed successfully
> 20/01/02 11:22:36 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
> 20/01/02 11:22:36 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) finished in 10.131 s
> 20/01/02 11:22:36 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 51.031212 s
> 20/01/02 11:22:58 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID 5) in 32029 ms on 9.179.143.4 (executor 1) (1/1)
> 20/01/02 11:22:58 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose tasks have all completed, from pool
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org