You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kaspar Tint (JIRA)" <ji...@apache.org> on 2018/12/13 12:47:00 UTC

[jira] [Updated] (SPARK-26359) Spark checkpoint restore fails after query restart

     [ https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kaspar Tint updated SPARK-26359:
--------------------------------
    Attachment: worker-redacted
                state-redacted
                redacted-offsets
                metadata
                driver-redacted

> Spark checkpoint restore fails after query restart
> --------------------------------------------------
>
>                 Key: SPARK-26359
>                 URL: https://issues.apache.org/jira/browse/SPARK-26359
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Submit, Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Spark 2.4.0 deployed in standalone-client mode
> Checkpointing is done to S3
> The Spark application in question is responsible for running 4 different queries
> Queries are written using Structured Streaming
>            Reporter: Kaspar Tint
>            Priority: Major
>         Attachments: driver-redacted, metadata, redacted-offsets, state-redacted, worker-redacted
>
>
> We had an incident where one of our structured streaming queries could not be restarted after an usual S3 checkpointing failure. Now to clarify before everything else - we are aware of the issues with S3 and are working towards moving to HDFS but this will take time. S3 will cause queries to fail quite often during peak hours and we have separate logic to handle this that will attempt to restart the failed queries if possible.
> In this particular case we could not restart one of the failed queries. Seems like between detecting a failure in the query and starting it up again something went really wrong with Spark and state in checkpoint folder got corrupted for some reason.
> The issue starts with the usual *FileNotFoundException* that happens with S3
> {code:java}
> 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = c074233a-2563-40fc-8036-b5e38e2e2c42, runId = e607eb6e-8431-4269-acab-cc2c1f9f09dd]
> terminated with error
> java.io.FileNotFoundException: No such file or directory: s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
> 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
> at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>         at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
>         at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>         at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>         at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>         at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
>         at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
>         at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>         at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
> og.scala:126)
>         at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
>         at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
>         at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
>         at scala.Option.getOrElse(Option.scala:121)
>         at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
>         at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> 2018-12-10 21:09:25.791 WARN InsightsSparkAggregates$: Query feedback terminated with exception, attempting restart
> {code}
> At the last line we claim that a restart will be attempted for the query named *feedback*. We start the query up and encounter this almost immediately
> {code:java}
> 2018-12-10 21:09:26.870 WARN InsightsSparkAggregates$: Query feedback currently not running, starting query in own scheduling pool
> 2018-12-10 21:09:51.776 WARN TaskSetManager: Lost task 11.0 in stage 66240.0 (TID 2782264, ec2-52-87-158-48.compute-1.amazonaws.com, executor 29): java.lang.IllegalStateException: Error reading delta file s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta of HDFSStateStoreProvider[id = (op=2,part=11),dir = s3a://some.domain/spark/checkpoints/49/feedback/state/2/11]: s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta does not exist
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:427)
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:384)
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
>         at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:383)
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:356)
>         at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535)
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:356)
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:204)
>         at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:371)
>         at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:88)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>         at org.apache.spark.scheduler.Task.run(Task.scala:121)
>         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: No such file or directory: s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>         at org.apache.hadoop.fs.DelegateToFileSystem.open(DelegateToFileSystem.java:190)
>         at org.apache.hadoop.fs.AbstractFileSystem.open(AbstractFileSystem.java:649)
>         at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:802)
>         at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:798)
>         at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>         at org.apache.hadoop.fs.FileContext.open(FileContext.java:804)
>         at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.open(CheckpointFileManager.scala:322)
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:424)
>         ... 28 more
> {code}
> And this will go on for ever until we bump the checkpoint folder name.
> {code:java}
> 2018-12-10 21:09:57.261 WARN TaskSetManager: Lost task 7.0 in stage 66265.0 (TID 2783200, ec2-34-236-156-197.compute-1.amazonaws.com, executor 40): java.lang.IllegalStateException: Error committing version 49464 into HDFSStateStore[id=(op=1,part=7),dir=s3a://some.domain/spark/checkpoints/49/dlr/state/1/7]
>         at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:138)
>         at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
>     .....
> {code}
> Now when looking into S3 it indeed looks like this delta file never was created. Instead we have a
> {code:java} 
> s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp
> {code}
> file that I assume is named like that as long as the whole operation is not finished yet. So this file never got renamed to 36870.delta and the application will keep trying to reference it.
> I will have all the relevant redacted logs attached to this report together with ls output of S3 folders and also the metadata file. If any more information is needed then I would be happy to provide it. Would also appreciate on some input on how to best resolve this issue? For now it has happened on 2 separate days and the solution has been to bump the checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org