You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Lian Jiang <ji...@gmail.com> on 2019/03/07 05:58:22 UTC

spark structured streaming crash due to decompressing gzip file failure

Hi,

I have a structured streaming job which listens to a hdfs folder containing
jsonl.gz files. The job crashed due to error:

java.io.IOException: incorrect header check
    at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
Method)
    at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
    at
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
    at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
    at java.io.InputStream.read(InputStream.java:101)
    at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
    at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
    at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
    at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
    at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
    at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
    at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
    at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
    at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)


Is there a way to skip the gz files that cannot be decompressed? Exception
handling seems not help. The only workaround I can think of is to
decompress the gz files into another folder first and make the spark
streaming job listen to this new folder. But this workaround may not be
better compared with the solution using a unstructured streaming job to
directly decompress the gz file, read jsonl file, validate the records and
write the validated records into parquet.

Any idea is highly appreciated!

Re: spark structured streaming crash due to decompressing gzip file failure

Posted by Lian Jiang <ji...@gmail.com>.
Thanks, it worked.

On Thu, Mar 7, 2019 at 5:05 AM Akshay Bhardwaj <
akshay.bhardwaj1988@gmail.com> wrote:

> Hi,
>
> In your spark-submit command, try using the below config property and see
> if this solves the problem.
>
> --conf spark.sql.files.ignoreCorruptFiles=true
>
> For me this worked to ignore reading empty/partially uploaded gzip files
> in s3 bucket.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang <ji...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a structured streaming job which listens to a hdfs folder
>> containing jsonl.gz files. The job crashed due to error:
>>
>> java.io.IOException: incorrect header check
>>     at
>> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
>> Method)
>>     at
>> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
>>     at
>> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
>>     at
>> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
>>     at java.io.InputStream.read(InputStream.java:101)
>>     at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
>>     at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
>>     at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
>>     at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
>>     at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
>>     at
>> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>>     at
>> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>>     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>     at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>>     at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
>>     at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>>     at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>> Source)
>>     at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>     at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>     at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
>>     at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>>     at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:109)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>>
>> Is there a way to skip the gz files that cannot be decompressed?
>> Exception handling seems not help. The only workaround I can think of is to
>> decompress the gz files into another folder first and make the spark
>> streaming job listen to this new folder. But this workaround may not be
>> better compared with the solution using a unstructured streaming job to
>> directly decompress the gz file, read jsonl file, validate the records and
>> write the validated records into parquet.
>>
>> Any idea is highly appreciated!
>>
>>
>>
>>
>>

Re: spark structured streaming crash due to decompressing gzip file failure

Posted by Akshay Bhardwaj <ak...@gmail.com>.
Hi,

In your spark-submit command, try using the below config property and see
if this solves the problem.

--conf spark.sql.files.ignoreCorruptFiles=true

For me this worked to ignore reading empty/partially uploaded gzip files in
s3 bucket.

Akshay Bhardwaj
+91-97111-33849


On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang <ji...@gmail.com> wrote:

> Hi,
>
> I have a structured streaming job which listens to a hdfs folder
> containing jsonl.gz files. The job crashed due to error:
>
> java.io.IOException: incorrect header check
>     at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
> Method)
>     at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
>     at
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
>     at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
>     at java.io.InputStream.read(InputStream.java:101)
>     at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
>     at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
>     at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
>     at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
>     at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
>     at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>     at
> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>     at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>     at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
>     at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>     at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
>     at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>     at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
>     at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>     at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>     at org.apache.spark.scheduler.Task.run(Task.scala:109)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
>
> Is there a way to skip the gz files that cannot be decompressed? Exception
> handling seems not help. The only workaround I can think of is to
> decompress the gz files into another folder first and make the spark
> streaming job listen to this new folder. But this workaround may not be
> better compared with the solution using a unstructured streaming job to
> directly decompress the gz file, read jsonl file, validate the records and
> write the validated records into parquet.
>
> Any idea is highly appreciated!
>
>
>
>
>