You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shixiong Zhu (JIRA)" <ji...@apache.org> on 2018/01/12 00:57:00 UTC

[jira] [Comment Edited] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.

    [ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323320#comment-16323320 ] 

Shixiong Zhu edited comment on SPARK-23050 at 1/12/18 12:56 AM:
----------------------------------------------------------------

How do you read the output? If you use Spark to read the output, it will only read the successful files which are stored in the file sink metadata. 


was (Author: zsxwing):
How do you read the output? If you use Spark to read the output, it will only read the successful files which are stored in the query metadata. 

> Structured Streaming with S3 file source duplicates data because of eventual consistency.
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-23050
>                 URL: https://issues.apache.org/jira/browse/SPARK-23050
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Yash Sharma
>
> Spark Structured streaming with S3 file source duplicates data because of eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the files have been written to Filesystem. {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and fails the task. {{org.apache.spark.SparkException: Task failed while writing rows. No such file or directory 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this time. {{ManifestFileCommitProtocol.newTaskTempFile. part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written to S3.
> - There is no data duplication if spark is able to list presence of all committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
>     .format("parquet") \
>     .option("compression", "snappy") \
>     .option("path", "s3://path/data/") \
>     .option("checkpointLocation", "s3://path/checkpoint/") \
>     .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00      17070 part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10      17070 part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:========================>                            (277 + 100) / 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  org.apache.spark.SparkException: Task failed while writing rows
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>  	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  	at org.apache.spark.scheduler.Task.run(Task.scala:108)
>  	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>  	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  	at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>  	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>  	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>  	at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>  	at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>  	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  	at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
>  	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
>  	... 8 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org