You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kay Ousterhout (JIRA)" <ji...@apache.org> on 2016/06/08 18:43:21 UTC

[jira] [Commented] (SPARK-14485) Task finished cause fetch failure when its executor has already been removed by driver

    [ https://issues.apache.org/jira/browse/SPARK-14485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15321177#comment-15321177 ] 

Kay Ousterhout commented on SPARK-14485:
----------------------------------------

I commented on the pull request, but want to continue the discussion here for archiving purposes.

My understanding is that this pull request fixes the following sequence of events:
(1) A task completes on an executor
(2) The executor fails
(3) The scheduler is notified about the task completing.
(4) A future stage that depends on the task runs, and fails, because the executor where the data was stored has failed.

With the proposed pull request, in step (3), the scheduler ignores the update, because it came from a failed executor.

I don't think we should do this for a few reasons:

(a) If the task didn't have a result stored on the executor (e.g., it computed some result on the RDD that it sent directly back to the master, like counting the elements in the RDD), it doesn't need to be failed, and can complete successfully.  With this change, we'd unnecessarily re-run the task.
(b) If the task did had in IndirectTaskResult (where it was too big to be sent directly to the master), the TaskResultGetter will fail to get the task result, and the task will be marked as failed.  This already worked correctly with the old code (AFAIK).
(c) This change is attempting to fix a third case, where the task had shuffle data that's now inaccessible because the machine had died.  I don't think it makes sense to fix this, because you can imagine a slight change in timing that causes the order of (2) and (3) above to be swapped.  In this case, even with the proposed code change, we're still stuck with the fetch failure and re-running the map stage.  Furthermore, it's possible (and likely!) that there were other map tasks that ran on the failed executor, and those tasks won't be failed and re-run with this change, so the reduce stage will still fail.  In general, the reason we have the fetch failure mechanism is because it can happen that shuffle data gets lost, and rather than detecting every kind of map-side failure, it's simpler to fail on the reduce side and then re-run the necessary tasks in the map stage.

Given all of the above, I'd advocate for reverting this change and marking the JIRA as won't fix.  [~vanzin] [~iward] let me know what your thoughts are. 

> Task finished cause fetch failure when its executor has already been removed by driver 
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-14485
>                 URL: https://issues.apache.org/jira/browse/SPARK-14485
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.1, 1.5.2
>            Reporter: iward
>            Assignee: iward
>             Fix For: 2.0.0
>
>
> Now, when executor is removed by driver with heartbeats timeout, driver will re-queue the task on this executor and send a kill command to cluster to kill this executor.
> But, in a situation, the running task of this executor is finished and return result to driver before this executor killed by kill command sent by driver. At this situation, driver will accept the task finished event and ignore  speculative task and re-queued task. But, as we know, this executor has removed by driver, the result of this finished task can not save in driver because the *BlockManagerId* has also removed from *BlockManagerMaster* by driver. So, the result data of this stage is not complete, and then, it will cause fetch failure.
> For example, the following is the task log:
> {noformat}
> 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 WARN HeartbeatReceiver: Removing executor 322 with no recent heartbeats: 256015 ms exceeds timeout 250000 ms
> 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 ERROR YarnScheduler: Lost executor 322 on BJHC-HERA-16168.hadoop.jd.local: Executor heartbeat timed out after 256015 ms
> 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 INFO TaskSetManager: Re-queueing tasks for 322 from TaskSet 107.0
> 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 WARN TaskSetManager: Lost task 229.0 in stage 107.0 (TID 10384, BJHC-HERA-16168.hadoop.jd.local): ExecutorLostFailure (executor 322 lost)
> 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 INFO DAGScheduler: Executor lost: 322 (epoch 11)
> 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 INFO BlockManagerMasterEndpoint: Trying to remove executor 322 from BlockManagerMaster.
> 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 INFO BlockManagerMaster: Removed 322 successfully in removeExecutor
> {noformat}
> {noformat}
> 2015-12-31 04:38:52 INFO 15/12/31 04:38:52 INFO TaskSetManager: Finished task 229.0 in stage 107.0 (TID 10384) in 272315 ms on BJHC-HERA-16168.hadoop.jd.local (579/700)
> 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO TaskSetManager: Ignoring task-finished event for 229.1 in stage 107.0 because task 229 has already completed successfully
> {noformat}
> {noformat}
> 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 107 (MapPartitionsRDD[263] at mapPartitions at Exchange.scala:137)
> 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO YarnScheduler: Adding task set 107.1 with 3 tasks
> 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO TaskSetManager: Starting task 0.0 in stage 107.1 (TID 10863, BJHC-HERA-18043.hadoop.jd.local, PROCESS_LOCAL, 3745 bytes)
> 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO TaskSetManager: Starting task 1.0 in stage 107.1 (TID 10864, BJHC-HERA-9291.hadoop.jd.local, PROCESS_LOCAL, 3745 bytes)
> 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO TaskSetManager: Starting task 2.0 in stage 107.1 (TID 10865, BJHC-HERA-16047.hadoop.jd.local, PROCESS_LOCAL, 3745 bytes)
> {noformat}
> Driver will check the stage's result is not complete, and submit missing task, but this time, the next stage has run because previous stage has finish for its task is all finished although its result is not complete.
> {noformat}
> 2015-12-31 04:40:13 INFO 15/12/31 04:40:13 WARN TaskSetManager: Lost task 39.0 in stage 109.0 (TID 10905, BJHC-HERA-9357.hadoop.jd.local): FetchFailed(null, shuffleId=11, mapId=-1, reduceId=39, message=
> 2015-12-31 04:40:13 INFO org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 11
> 2015-12-31 04:40:13 INFO at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
> 2015-12-31 04:40:13 INFO at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
> 2015-12-31 04:40:13 INFO at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 2015-12-31 04:40:13 INFO at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 2015-12-31 04:40:13 INFO at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 2015-12-31 04:40:13 INFO at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> 2015-12-31 04:40:13 INFO at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 2015-12-31 04:40:13 INFO at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> 2015-12-31 04:40:13 INFO at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
> 2015-12-31 04:40:13 INFO at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:172)
> 2015-12-31 04:40:13 INFO at org.apache.spark.shuffle.sort.SortShuffleReader.computeShuffleBlocks(SortShuffleReader.scala:301)
> 2015-12-31 04:40:13 INFO at org.apache.spark.shuffle.sort.SortShuffleReader.read(SortShuffleReader.scala:111)
> 2015-12-31 04:40:13 INFO at org.apache.spark.shuffle.sort.MixedShuffleReader.read(MixedShuffleReader.scala:41)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 2015-12-31 04:40:13 INFO at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> 2015-12-31 04:40:13 INFO at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> 2015-12-31 04:40:13 INFO at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 2015-12-31 04:40:13 INFO at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209)
> 2015-12-31 04:40:13 INFO at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 2015-12-31 04:40:13 INFO at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 2015-12-31 04:40:13 INFO at java.lang.Thread.run(Thread.java:745)
> 2015-12-31 04:40:13 INFO 
> {noformat}
> As the task log show, in this situation, it will casue FetchFailedException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org