You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Franklyn Dsouza (JIRA)" <ji...@apache.org> on 2017/01/20 02:46:26 UTC

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

     [ https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Franklyn Dsouza updated SPARK-19299:
------------------------------------
    Description: 
The problem we're seeing is that if a null occurs in a no-nullable field and is written down to parquet the resulting file gets corrupt and can not be read back correctly.

One way that this can occur is when a long value in python is too big to fit into a spark LongType it gets cast to null. 

We're also seeing that the behaviour is different depending on whether or not the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64),
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader on this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up and between columns.

with vectorized reader off we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read value at 4 in block 0 in file file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-00000-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition level: 0, definition level: 0
	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
	... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
	at org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:458)
	... 22 more
Caused by: java.io.EOFException
	at org.apache.parquet.bytes.LittleEndianDataInputStream.readFully(LittleEndianDataInputStream.java:90)
	at org.apache.parquet.bytes.LittleEndianDataInputStream.readLong(LittleEndianDataInputStream.java:377)
	at org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:129)
{code}

  was:
The problem we're seeing is that if a null occurs in a no-nullable field and is written down to parquet the resulting file gets corrupt and can not be read back correctly.

One way that this can occur is when a long value in python is too big to fit into a spark LongType it gets cast to null. 

We're also seeing that the behaviour is slightly different depending on whether or not the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64),
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader on this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up and between columns.

with vectorized reader off we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read value at 4 in block 0 in file file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-00000-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition level: 0, definition level: 0
	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
	... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
	at org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:458)
	... 22 more
Caused by: java.io.EOFException
	at org.apache.parquet.bytes.LittleEndianDataInputStream.readFully(LittleEndianDataInputStream.java:90)
	at org.apache.parquet.bytes.LittleEndianDataInputStream.readLong(LittleEndianDataInputStream.java:377)
	at org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:129)
{code}


> Nulls in non nullable columns causes data corruption in parquet
> ---------------------------------------------------------------
>
>                 Key: SPARK-19299
>                 URL: https://issues.apache.org/jira/browse/SPARK-19299
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>            Reporter: Franklyn Dsouza
>
> The problem we're seeing is that if a null occurs in a no-nullable field and is written down to parquet the resulting file gets corrupt and can not be read back correctly.
> One way that this can occur is when a long value in python is too big to fit into a spark LongType it gets cast to null. 
> We're also seeing that the behaviour is different depending on whether or not the vectorized reader is enabled.
> Here's an example in PySpark
> {code}
> from datetime import datetime
> from pyspark.sql import types
> data = [
>   (1, 6),
>   (2, 7),
>   (3, 2 ** 64),
>   (4, 8),
>   (5, 9)
> ]
> schema = types.StructType([
>   types.StructField("index", types.LongType(), False),
>   types.StructField("long", types.LongType(), False),
> ])
> df = sc.sql.createDataFrame(data, schema)
> df.collect()
> df.write.parquet("corrupt_parquet")
> df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
> df_parquet.collect()
> {code}
> with the vectorized reader on this causes
> {code}
> In [2]: df.collect()
> Out[2]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=None),
>  Row(index=4, long=8),
>  Row(index=5, long=9)]
> In [3]: df_parquet.collect()
> Out[3]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=8),
>  Row(index=4, long=9),
>  Row(index=5, long=5)]
> {code}
> as you can see reading the data back from disk causes data to get shifted up and between columns.
> with vectorized reader off we are completely unable to read the file.
> {code}
> Py4JJavaError: An error occurred while calling o143.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read value at 4 in block 0 in file file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-00000-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
> 	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
> 	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
> 	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition level: 0, definition level: 0
> 	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
> 	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
> 	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
> 	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
> 	... 19 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
> 	at org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
> 	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
> 	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:458)
> 	... 22 more
> Caused by: java.io.EOFException
> 	at org.apache.parquet.bytes.LittleEndianDataInputStream.readFully(LittleEndianDataInputStream.java:90)
> 	at org.apache.parquet.bytes.LittleEndianDataInputStream.readLong(LittleEndianDataInputStream.java:377)
> 	at org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:129)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org