You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2014/12/13 06:45:13 UTC

[jira] [Commented] (SPARK-4835) Streaming saveAs*HadoopFiles() methods may throw FileAlreadyExistsException during checkpoint recovery

    [ https://issues.apache.org/jira/browse/SPARK-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14245218#comment-14245218 ] 

Josh Rosen commented on SPARK-4835:
-----------------------------------

One subtlety here: we probably shouldn't rely on the SparkConf having {{spark.hadoop.validateOutputSpecs}} set to {{false}}, since this is the SparkContext's {{conf}} and that context might be shared with other non-streaming jobs / tasks.  We also shouldn't mutate it, since, in general, mutating SparkConf is a serious anti-pattern (even in internal code).

Instead, we could add some plumbing so that every {{saveAs*}} RDD method accepts an optional parameter to disable output spec validation.  This solution also kind of messy, though, since it ends up touching a lot of code: if we don't change this everywhere, then there's the possibility that we'll miss some corner-case and re-introduce the bug.

Instead, we might be able to use DynamicVariable to dynamically bypass the output spec checking for jobs that are launched by the streaming scheduler.  This would have a somewhat minimal impact on the source code and would avoid merge-conflict hell when backporting this code, but might be hard to understand.  However, this might be nicer from a user-facing point-of-view since we wouldn't end up cluttering up the {{saveAs*}} methods with a {{bypassOutputSpecValidation}} boolean that's only used by streaming.

> Streaming saveAs*HadoopFiles() methods may throw FileAlreadyExistsException during checkpoint recovery
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-4835
>                 URL: https://issues.apache.org/jira/browse/SPARK-4835
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: Josh Rosen
>            Assignee: Tathagata Das
>            Priority: Critical
>
> While running (a slightly modified version of) the "recovery with saveAsHadoopFiles operation" test in the streaming CheckpointSuite, I noticed the following error message in the streaming driver log:
> {code}
> 14/12/12 17:42:50.687 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO JobScheduler: Added jobs for time 1500 ms
> 14/12/12 17:42:50.687 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO RecurringTimer: Started timer for JobGenerator at time 2000
> 14/12/12 17:42:50.688 sparkDriver-akka.actor.default-dispatcher-3 INFO JobScheduler: Starting job streaming job 1500 ms.0 from job set of time 1500 ms
> 14/12/12 17:42:50.688 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO JobGenerator: Restarted JobGenerator at 2000 ms
> 14/12/12 17:42:50.688 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO JobScheduler: Started JobScheduler
> 14/12/12 17:42:50.689 sparkDriver-akka.actor.default-dispatcher-3 INFO JobScheduler: Starting job streaming job 1500 ms.1 from job set of time 1500 ms
> 14/12/12 17:42:50.689 sparkDriver-akka.actor.default-dispatcher-3 ERROR JobScheduler: Error running job streaming job 1500 ms.0
> org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/var/folders/0k/2qp2p2vs7bv033vljnb8nk1c0000gn/T/1418434967213-0/-1500.result already exists
> 	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:121)
> 	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1045)
> 	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944)
> 	at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9.apply(PairDStreamFunctions.scala:677)
> 	at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9.apply(PairDStreamFunctions.scala:675)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> 	at scala.util.Try$.apply(Try.scala:161)
> 	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> 14/12/12 17:42:50.691 pool-12-thread-1 INFO SparkContext: Starting job: apply at Transformer.scala:22
> {code}
> Spark Streaming's {{saveAsHadoopFiles}} method calls Spark's {{rdd.saveAsHadoopFile}} method.  The Spark method, in turn, called {{PairRDDFunctions.saveAsHadoopDataset()}}, which has error-checking to ensure that the output directory does not already exist:
> {code}
>     if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
>       // FileOutputFormat ignores the filesystem parameter
>       val ignoredFs = FileSystem.get(hadoopConf)
>       hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
>     }
> {code}
> If Spark Streaming recovers from a checkpoint and re-runs the last batch in the checkpoint, then {{saveAsHadoopDataset}} will have been called twice with the same output path.  If the output path exists from the first, pre-recovery run, then the recovery will fail.
> This seems like it could be a pretty serious issue: imagine that a streaming job fails partway through a save() operation, then recovers: in this case, the existing directory will prevent us from ever recovering and finishing the save().
> Fortunately, this should be simple to fix: we should disable the existing directory checks for output operations called by streaming jobs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org