You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "Gabor Szadovszky (JIRA)" <ji...@apache.org> on 2019/03/13 07:27:00 UTC

[jira] [Resolved] (PARQUET-1531) Page row count limit causes empty pages to be written from MessageColumnIO

     [ https://issues.apache.org/jira/browse/PARQUET-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gabor Szadovszky resolved PARQUET-1531.
---------------------------------------
    Resolution: Fixed

> Page row count limit causes empty pages to be written from MessageColumnIO
> --------------------------------------------------------------------------
>
>                 Key: PARQUET-1531
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1531
>             Project: Parquet
>          Issue Type: Bug
>    Affects Versions: 1.11.0
>            Reporter: Matt Cheah
>            Assignee: Gabor Szadovszky
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> This originally manifested as https://issues.apache.org/jira/browse/SPARK-26874 but we realized that this is fundamentally an issue in the way PARQUET-1414's solution interacts with {{MessageColumnIO}}, where Spark is one such user of that API.
> In {{MessageColumnIO#endMessage()}}, we first examine if any fields are missing and fill in the values with null in {{MessageColumnIO#writeNullForMissingFieldsAtCurrentLevel}}. However, this method might not actually write any nulls to the underlying page. {{MessageColumnIO}} can buffer nulls in memory and flush them to the page store lazily.
> Regardless of whether or not nulls are flushed to the page store, in {{MessageColumnIO#endMessage}} we always call {{columns#endRecord()}} which will signal to the {{ColumnWriteStore}} that a record was written. At that point, the write store increments the row count for the current page by 1, and then check if the page needs to be flushed due to hitting the page row count limit.
> The problem is that with the above writing scheme, {{MessageColumnIO}} can cause empty pages to be written to Parquet files, and empty pages are not readable by Parquet readers. Suppose the page row count limit is N, and the {{MessageColumnIO}} receives N nulls for a column. The {{MessageColumnIO}} will buffer the nulls in memory, and doesn't necessarily flush the nulls to the writer yet. On the Nth call to {{endMessage()}}, however, the column store will think there are N values in memory and that the page has hit the row count limit, despite the fact that no rows have actually been written at all. But the underlying page writer will write an empty page regardless.
> To illustrate the problem, one can try running this simple example inserted into Spark's \{{ParquetIOSuite}} when Spark has been upgraded to use the master branch of Parquet. Attach a debugger to {{MessageColumnIO#endMessage()}} and trace the logic accordingly - the column writer will push a page with 0 values:
> {code:java}
> test("PARQUET-1414 Problems") {
>   // Manually adjust the maximum row count to reproduce the issue on small data
>   sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
>   withTempPath { location =>
>     val path = new Path(location.getCanonicalPath + "/parquet-data")
>     val schema = StructType(
>       Array(StructField("timestamps1", ArrayType(TimestampType))))
>     val rows = ListBuffer[Row]()
>     for (j <- 0 until 10) {
>       rows += Row(
>         null.asInstanceOf[Array[java.sql.Timestamp]])
>     }
>     val srcDf = spark.createDataFrame(
>       sparkContext.parallelize(rows, 3),
>       schema,
>       true)
>     srcDf.write.parquet(path.toString)
>     assert(spark.read.parquet(path.toString).collect.size > 0)
>   }
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)