You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zach Fry (JIRA)" <ji...@apache.org> on 2015/01/08 00:47:35 UTC

[jira] [Updated] (SPARK-4879) Missing output partitions after job completes with speculative execution

     [ https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zach Fry updated SPARK-4879:
----------------------------
    Attachment: speculation2.txt
                speculation.txt

Hey Josh, 

I have been playing around with your repro above and I think I can consistently trigger the bad behavior by just tweaking the value of {{spark.speculation.multiplier}} and {{spark.speculation.quantile}}.

I set the {{multiplier}} to be 1 and the {{quantile}} to 0.01 so that only 1% of tasks have to finish before any task that takes longer than those 1% of tasks should speculate. 
As expected, I see a lot of tasks getting speculated. 
After running the repro about 5 times, I have seen 2 errors (stack traces at the bottom and the full run from the REPL is attached with this comment). 

One thing I do notice is that the part-00000 associated with Stage 1 was always where I expected it to be in HDFS, and all lines were present (checked using a {{wc -l}})


{code}
scala> 15/01/07 13:44:26 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 119, <redacted-host-02>): java.io.IOException: The temporary job-output directory hdfs://<redacted-host-01>:8020/test6/_temporary doesn't exist!
        org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:250)
        org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:240)
        org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:116)
        org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:89)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:980)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
{code}

{code}
15/01/07 15:17:39 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 120, <redacted-host-03>): org.apache.hadoop.ipc.RemoteException: No lease on /test7/_temporary/_attempt_201501071517_0000_m_000000_120/part-00000: File does not exist. Holder DFSClient_NONMAPREDUCE_-469253416_73 does not have any open files.
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2609)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:2426)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2339)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:501)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:299)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44954)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1752)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1748)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1746)

        org.apache.hadoop.ipc.Client.call(Client.java:1238)
        org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
        com.sun.proxy.$Proxy9.addBlock(Unknown Source)
        sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        java.lang.reflect.Method.invoke(Method.java:606)
        org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
        org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
        com.sun.proxy.$Proxy9.addBlock(Unknown Source)
        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:291)
        org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1177)
        org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1030)
        org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488)
{code}


> Missing output partitions after job completes with speculative execution
> ------------------------------------------------------------------------
>
>                 Key: SPARK-4879
>                 URL: https://issues.apache.org/jira/browse/SPARK-4879
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Spark Core
>    Affects Versions: 1.0.2, 1.1.1, 1.2.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Critical
>         Attachments: speculation.txt, speculation2.txt
>
>
> When speculative execution is enabled ({{spark.speculation=true}}), jobs that save output files may report that they have completed successfully even though some output partitions written by speculative tasks may be missing.
> h3. Reproduction
> This symptom was reported to me by a Spark user and I've been doing my own investigation to try to come up with an in-house reproduction.
> I'm still working on a reliable local reproduction for this issue, which is a little tricky because Spark won't schedule speculated tasks on the same host as the original task, so you need an actual (or containerized) multi-host cluster to test speculation.  Here's a simple reproduction of some of the symptoms on EC2, which can be run in {{spark-shell}} with {{--conf spark.speculation=true}}:
> {code}
>     // Rig a job such that all but one of the tasks complete instantly
>     // and one task runs for 20 seconds on its first attempt and instantly
>     // on its second attempt:
>     val numTasks = 100
>     sc.parallelize(1 to numTasks, numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =>
>       if (ctx.partitionId == 0) {  // If this is the one task that should run really slow
>         if (ctx.attemptId == 0) {  // If this is the first attempt, run slow
>          Thread.sleep(20 * 1000)
>         }
>       }
>       iter
>     }.map(x => (x, x)).saveAsTextFile("/test4")
> {code}
> When I run this, I end up with a job that completes quickly (due to speculation) but reports failures from the speculated task:
> {code}
> [...]
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal (100/100)
> 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at <console>:22) finished in 0.856 s
> 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at <console>:22, took 0.885438374 s
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event for 70.1 in stage 3.0 because task 70 has already completed successfully
> scala> 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): java.io.IOException: Failed to save output of task: attempt_201412110141_0003_m_000049_413
>         org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
>         org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
>         org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
>         org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)
>         org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)
>         org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
> {code}
> One interesting thing to note about this stack trace: if we look at {{FileOutputCommitter.java:160}} ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]), this point in the execution seems to correspond to a case where a task completes, attempts to commit its output, fails for some reason, then deletes the destination file, tries again, and fails:
> {code}
>  if (fs.isFile(taskOutput)) {
> 152      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
> 153                                          getTempTaskOutputPath(context));
> 154      if (!fs.rename(taskOutput, finalOutputPath)) {
> 155        if (!fs.delete(finalOutputPath, true)) {
> 156          throw new IOException("Failed to delete earlier output of task: " + 
> 157                                 attemptId);
> 158        }
> 159        if (!fs.rename(taskOutput, finalOutputPath)) {
> 160          throw new IOException("Failed to save output of task: " + 
> 161        		  attemptId);
> 162        }
> 163      }
> {code}
> This could explain why the output file is missing: the second copy of the task keeps running after the job completes and deletes the output written by the other task after failing to commit its own copy of the output.
> There are still a few open questions about how exactly we get into this scenario:
> *Why is the second copy of the task allowed to commit its output after the other task / the job has successfully completed?*
> To check whether a task's temporary output should be committed, SparkHadoopWriter calls {{FileOutputCommitter.needsTaskCommit()}}, which returns {{true}} if the tasks's temporary output exists ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#206]).  Tihs does not seem to check whether the destination already exists.  This means that {{needsTaskCommit}} can return {{true}} for speculative tasks.
> *Why does the rename fail?*
> I think that what's happening is that the temporary task output files are being deleted once the job has completed, which is causing the {{rename}} to fail because {{FileOutputCommitter.commitTask}} doesn't seem to guard against missing output files.
> I'm not sure about this, though, since the stack trace seems to imply that the temporary output file existed.  Maybe the filesystem methods are returning stale metadata?  Maybe there's a race?  I think a race condition seems pretty unlikely, since the time-scale at which it would have to happen doesn't sync up with the scale of the timestamps that I saw in the user report.
> h3. Possible Fixes:
> The root problem here might be that speculative copies of tasks are somehow allowed to commit their output.  We might be able to fix this by centralizing the "should this task commit its output" decision at the driver.
> (I have more concrete suggestions of how to do this; to be posted soon)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org