You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2019/07/10 00:23:00 UTC

[jira] [Comment Edited] (SPARK-27570) java.io.EOFException Reached the end of stream - Reading Parquet from Swift

    [ https://issues.apache.org/jira/browse/SPARK-27570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881623#comment-16881623 ] 

Josh Rosen edited comment on SPARK-27570 at 7/10/19 12:22 AM:
--------------------------------------------------------------

I ran into a very similar issue, except I was reading from S3 instead of OpenStack Swift. In my reproduction, the addition or removal of filters or projections affected whether I hit the error. In my case, I think the problem was https://issues.apache.org/jira/browse/HADOOP-16109, an issue where Parquet could sometimes use access patterns that hit a bug in seek() in S3AInputStream (/cc [~stevel@apache.org]). I confirmed this by re-running my failing job against an exact copy of the data stored on HDFS (which succeeded).


was (Author: joshrosen):
I ran into a very similar issue, except I was reading from S3 instead of OpenStack Swift. In my reproduction, the addition or removal of filters or projections affected whether I hit the error. In my case, I think the problem was https://issues.apache.org/jira/browse/HADOOP-16109, an issue where Parquet could sometimes use access patterns that hit a bug in seek() in S3AInputStream (/cc [~stevel@apache.org]).

> java.io.EOFException Reached the end of stream - Reading Parquet from Swift
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-27570
>                 URL: https://issues.apache.org/jira/browse/SPARK-27570
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Harry Hough
>            Priority: Major
>
> I did see issue SPARK-25966 but it seems there are some differences as his problem was resolved after rebuilding the parquet files on write. This is 100% reproducible for me across many different days of data.
> I get exceptions such as "Reached the end of stream with 750477 bytes left to read" during some read operations of parquet files. I am reading these files from Openstack swift using openstack-hadoop 2.7.7 on Spark 2.4.
> The issues seem to happen with the where statement. I have also tried filter and combining the statements into one as well as the dataset method with column without any luck. Which column or what the actual filter is on the where also doesn't seem to make a difference to the error occurring or not.
>  
> {code:java}
>     val engagementDS = spark
>       .read
>       .parquet(createSwiftAddr("engagements", folder))
>       .where("engtype != 0")
>       .where("engtype != 1000")
>       .groupBy($"accid", $"sessionkey")
>       .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", $"testid")).as("engagements"))
> // Exiting paste mode, now interpreting.
> [Stage 53:> (0 + 32) / 32]2019-04-25 19:02:12 ERROR Executor:91 - Exception in task 24.0 in stage 53.0 (TID 688)
> java.io.EOFException: Reached the end of stream with 1323959 bytes left to read
> at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
> at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
> at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
> at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
> at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
> at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
> at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
> at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
> at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
> at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:107)
> at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The above gives the error 100% of the time.
> {code:java}
>     val engagementDS = spark
>       .read
>       .parquet(createSwiftAddr("engagements", folder))
>       .count
> {code}
> This works correctly as well as doing a .show(false)
> {code:java}
>     val engagementDS = spark
>       .read
>       .parquet(createSwiftAddr("engagements", folder))
>       .groupBy($"accid", $"sessionkey")
>       .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", $"testid")).as("engagements"))
>       .show(false)
> {code}
> Works correctly.
> {code:java}
>     val engagementDS = spark
>       .read
>       .parquet(createSwiftAddr("engagements", folder))
>       .where("engtype != 0")
>       .count
> {code}
> The above code works but if I do .show(false) instead of count it breaks with the reached end of stream error.
> {code:java}
> engagementDS.select($"engtype").where("engtype != 0").where("engtype != 1000").show(false)
> +-------+
> |engtype|
> +-------+
> |10 |
> |17 |
> |4 |
> |4 |
> |10 |
> |17 |
> |15 |
> |10 |
> |17 |
> |10 |
> |16 |
> |15 |
> |10 |
> |16 |
> |15 |
> |15 |
> |10 |
> |4 |
> |10 |
> |17 |
> +-------+
> only showing top 20 rows
> {code}
> The above also works correctly.
> I can fix this issue with the below so it seems that all the data is there:
>  
> {code:java}
> val engagementDS = spark
> .read
> .parquet(createSwiftAddr("engagements", folder))
> //.filter("engtype != 0 AND engtype != 1000")
> .groupBy($"accid", $"sessionkey")
> .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", $"testid")).as("engagements"))
> .selectExpr("accid", "sessionkey", "filter(engagements, x -> x.engtype != 1000 AND x.engtype != 0) AS engagements")
> {code}
>  
>  
> Even if I'm doing something incorrectly this seems like a very strange error message :)
> Thanks for any help in advance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org