You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by Reynold Xin <rx...@databricks.com> on 2016/07/06 18:13:16 UTC

parquet-mr filter pushdown

Among the people working on Spark there are a lot of confusions about what
Parquet's filter pushdown actually accomplishes. Depending on who I talk
to, I get "it filters rows one by one" or "it skips blocks via min/max
value tracking". Can I get a more official response on this?

The reason I'm asking is that we have seen so many bugs related to filter
pushdown (either bugs in Parquet, or bugs in Spark's implementation of it)
that we are considering just permanently disabling filter pushdown, if the
performance gain is not enormous.

Let me know. Thanks.

Re: parquet-mr filter pushdown

Posted by Cheng Lian <li...@databricks.com>.
Filed PARQUET-654 for making record-level filtering optional.

Cheng

On 7/7/16 11:57 PM, Ryan Blue wrote:
> Sounds like there are two problems. First, PARQUET-389 needs to be 
> fixed. It looks like fixing it would have prevented the issues 
> introduced in the work-around, so it is probably a good idea to fix 
> the underlying problem as the way forward.
>
> The second issue is being able to filter row groups, but skip the 
> record-level filtering. This should be a really easy fix in the read 
> path, so please open an issue for it. I think you should be able to 
> get this into the 1.9.0 release. Also, this is somewhat related to the 
> vectorized read API we're putting together a hackathon to tackle, so 
> you may want to monitor that effort.
>
> rb
>
> On Thu, Jul 7, 2016 at 7:47 AM, Cheng Lian <lian@databricks.com 
> <ma...@databricks.com>> wrote:
>
>     One of the commonly seen ETL use cases of Spark is inferring
>     schema automatically from JSON datasets and then convert them into
>     Parquet. In similar use cases, schema evolution support can be
>     crucial. Reading from Parquet files with different but compatible
>     schemata is quite common. Schema evolution combined with filter
>     push-down can be a source of bugs. PARQUET-389 is an example of
>     this kind of bug. To workaround PARQUET-389, we made some
>     non-trivial changes in Spark (SPARK-11955), which further lead to
>     SPARK-16371.
>
>     From the perspective of performance, I totally agree that row
>     group level filtering is valuable. I think the real problem here
>     is that record-level filtering is mandatory if the engine decides
>     to use filter push-down. For engines with vectorized Parquet
>     reader, like Spark, Parquet built-in record-level filtering is not
>     performant enough. Actually, we observed that disabling filter
>     push-down may even result in better performance when the data is
>     not prepared for row group level filtering because the filter
>     predicates are evaluated at Spark side with the help of codegen. I
>     think one possible improvement we can do here is to make
>     record-level filtering optional. In this way, we may benefit from
>     both Parquet built-in row group level filtering and faster
>     record-level filtering provided by upper level engines. Of course,
>     when record-level filtering is disabled, engines themselves are
>     responsible for doing the filtering.
>
>     Cheng
>
>
>
>     On 7/7/16 2:43 AM, Ryan Blue wrote:
>>     Hi Reynold,
>>
>>     Parquet uses the same predicates that are passed to the reader
>>     (via withFilter [1]) for both record-level and row group
>>     filtering. We've found that the main benefit is when they can be
>>     used to eliminate entire row groups.
>>
>>     What bugs have you found? I've not seen problems with the
>>     filtering done by Parquet so I'm surprised that you guys have
>>     seen so many (presumably that you've tracked to Parquet
>>     push-down?) that it doesn't seem worth it.
>>
>>     Both record and row group filtering use the same predicates.
>>     Record filtering evaluates a predicate using an assembled record,
>>     so it is probably slower than filtering in Spark SQL. This is
>>     faster for engines like Pig that don't have vectorized reads and
>>     would have additional calls on top of the Parquet layer. Also,
>>     the 2.0 spec makes it possible to filter individual data pages,
>>     but this hasn't been implemented.
>>
>>     In contrast to record-level, row group filtering is *very*
>>     valuable when data is correctly prepared. We have datasets where
>>     row group filtering gets us a 20-100x speedup (measured in Pig,
>>     Presto, and Spark) because we only need to read 1% of the data.
>>     This uses column-level stats from the footer and dictionaries to
>>     eliminate row groups that can't satisfy the query predicate. For
>>     example, for a column with min=5, max=26 and a predicate x < 0,
>>     we know that there are no matching values. Similarly, we can look
>>     at a dictionary and see all of the possible values and eliminate
>>     a row group if none of them match the predicate. Row group
>>     filtering works best with the data sorted within partitions by
>>     common query columns.
>>
>>     rb
>>
>>     [1]:
>>     https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L190
>>
>>     On Wed, Jul 6, 2016 at 11:13 AM, Reynold Xin <rxin@databricks.com
>>     <ma...@databricks.com>> wrote:
>>
>>         Among the people working on Spark there are a lot of
>>         confusions about what
>>         Parquet's filter pushdown actually accomplishes. Depending on
>>         who I talk
>>         to, I get "it filters rows one by one" or "it skips blocks
>>         via min/max
>>         value tracking". Can I get a more official response on this?
>>
>>         The reason I'm asking is that we have seen so many bugs
>>         related to filter
>>         pushdown (either bugs in Parquet, or bugs in Spark's
>>         implementation of it)
>>         that we are considering just permanently disabling filter
>>         pushdown, if the
>>         performance gain is not enormous.
>>
>>         Let me know. Thanks.
>>
>>
>>
>>
>>     -- 
>>     Ryan Blue
>>     Software Engineer
>>     Netflix
>
>
>
>
> -- 
> Ryan Blue
> Software Engineer
> Netflix


Re: parquet-mr filter pushdown

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Sounds like there are two problems. First, PARQUET-389 needs to be fixed.
It looks like fixing it would have prevented the issues introduced in the
work-around, so it is probably a good idea to fix the underlying problem as
the way forward.

The second issue is being able to filter row groups, but skip the
record-level filtering. This should be a really easy fix in the read path,
so please open an issue for it. I think you should be able to get this into
the 1.9.0 release. Also, this is somewhat related to the vectorized read
API we're putting together a hackathon to tackle, so you may want to
monitor that effort.

rb

On Thu, Jul 7, 2016 at 7:47 AM, Cheng Lian <li...@databricks.com> wrote:

> One of the commonly seen ETL use cases of Spark is inferring schema
> automatically from JSON datasets and then convert them into Parquet. In
> similar use cases, schema evolution support can be crucial. Reading from
> Parquet files with different but compatible schemata is quite common.
> Schema evolution combined with filter push-down can be a source of bugs.
> PARQUET-389 is an example of this kind of bug. To workaround PARQUET-389,
> we made some non-trivial changes in Spark (SPARK-11955), which further lead
> to SPARK-16371.
>
> From the perspective of performance, I totally agree that row group level
> filtering is valuable. I think the real problem here is that record-level
> filtering is mandatory if the engine decides to use filter push-down. For
> engines with vectorized Parquet reader, like Spark, Parquet built-in
> record-level filtering is not performant enough. Actually, we observed that
> disabling filter push-down may even result in better performance when the
> data is not prepared for row group level filtering because the filter
> predicates are evaluated at Spark side with the help of codegen. I think
> one possible improvement we can do here is to make record-level filtering
> optional. In this way, we may benefit from both Parquet built-in row group
> level filtering and faster record-level filtering provided by upper level
> engines. Of course, when record-level filtering is disabled, engines
> themselves are responsible for doing the filtering.
>
> Cheng
>
>
> On 7/7/16 2:43 AM, Ryan Blue wrote:
>
> Hi Reynold,
>
> Parquet uses the same predicates that are passed to the reader (via
> withFilter [1]) for both record-level and row group filtering. We've found
> that the main benefit is when they can be used to eliminate entire row
> groups.
>
> What bugs have you found? I've not seen problems with the filtering done
> by Parquet so I'm surprised that you guys have seen so many (presumably
> that you've tracked to Parquet push-down?) that it doesn't seem worth it.
>
> Both record and row group filtering use the same predicates. Record
> filtering evaluates a predicate using an assembled record, so it is
> probably slower than filtering in Spark SQL. This is faster for engines
> like Pig that don't have vectorized reads and would have additional calls
> on top of the Parquet layer. Also, the 2.0 spec makes it possible to filter
> individual data pages, but this hasn't been implemented.
>
> In contrast to record-level, row group filtering is *very* valuable when
> data is correctly prepared. We have datasets where row group filtering gets
> us a 20-100x speedup (measured in Pig, Presto, and Spark) because we only
> need to read 1% of the data. This uses column-level stats from the footer
> and dictionaries to eliminate row groups that can't satisfy the query
> predicate. For example, for a column with min=5, max=26 and a predicate x <
> 0, we know that there are no matching values. Similarly, we can look at a
> dictionary and see all of the possible values and eliminate a row group if
> none of them match the predicate. Row group filtering works best with the
> data sorted within partitions by common query columns.
>
> rb
>
> [1]:
> https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L190
>
> On Wed, Jul 6, 2016 at 11:13 AM, Reynold Xin <rx...@databricks.com> wrote:
>
>> Among the people working on Spark there are a lot of confusions about what
>> Parquet's filter pushdown actually accomplishes. Depending on who I talk
>> to, I get "it filters rows one by one" or "it skips blocks via min/max
>> value tracking". Can I get a more official response on this?
>>
>> The reason I'm asking is that we have seen so many bugs related to filter
>> pushdown (either bugs in Parquet, or bugs in Spark's implementation of it)
>> that we are considering just permanently disabling filter pushdown, if the
>> performance gain is not enormous.
>>
>> Let me know. Thanks.
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: parquet-mr filter pushdown

Posted by Cheng Lian <li...@databricks.com>.
One of the commonly seen ETL use cases of Spark is inferring schema 
automatically from JSON datasets and then convert them into Parquet. In 
similar use cases, schema evolution support can be crucial. Reading from 
Parquet files with different but compatible schemata is quite common. 
Schema evolution combined with filter push-down can be a source of bugs. 
PARQUET-389 is an example of this kind of bug. To workaround 
PARQUET-389, we made some non-trivial changes in Spark (SPARK-11955), 
which further lead to SPARK-16371.

 From the perspective of performance, I totally agree that row group 
level filtering is valuable. I think the real problem here is that 
record-level filtering is mandatory if the engine decides to use filter 
push-down. For engines with vectorized Parquet reader, like Spark, 
Parquet built-in record-level filtering is not performant enough. 
Actually, we observed that disabling filter push-down may even result in 
better performance when the data is not prepared for row group level 
filtering because the filter predicates are evaluated at Spark side with 
the help of codegen. I think one possible improvement we can do here is 
to make record-level filtering optional. In this way, we may benefit 
from both Parquet built-in row group level filtering and faster 
record-level filtering provided by upper level engines. Of course, when 
record-level filtering is disabled, engines themselves are responsible 
for doing the filtering.

Cheng



On 7/7/16 2:43 AM, Ryan Blue wrote:
> Hi Reynold,
>
> Parquet uses the same predicates that are passed to the reader (via 
> withFilter [1]) for both record-level and row group filtering. We've 
> found that the main benefit is when they can be used to eliminate 
> entire row groups.
>
> What bugs have you found? I've not seen problems with the filtering 
> done by Parquet so I'm surprised that you guys have seen so many 
> (presumably that you've tracked to Parquet push-down?) that it doesn't 
> seem worth it.
>
> Both record and row group filtering use the same predicates. Record 
> filtering evaluates a predicate using an assembled record, so it is 
> probably slower than filtering in Spark SQL. This is faster for 
> engines like Pig that don't have vectorized reads and would have 
> additional calls on top of the Parquet layer. Also, the 2.0 spec makes 
> it possible to filter individual data pages, but this hasn't been 
> implemented.
>
> In contrast to record-level, row group filtering is *very* valuable 
> when data is correctly prepared. We have datasets where row group 
> filtering gets us a 20-100x speedup (measured in Pig, Presto, and 
> Spark) because we only need to read 1% of the data. This uses 
> column-level stats from the footer and dictionaries to eliminate row 
> groups that can't satisfy the query predicate. For example, for a 
> column with min=5, max=26 and a predicate x < 0, we know that there 
> are no matching values. Similarly, we can look at a dictionary and see 
> all of the possible values and eliminate a row group if none of them 
> match the predicate. Row group filtering works best with the data 
> sorted within partitions by common query columns.
>
> rb
>
> [1]: 
> https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L190
>
> On Wed, Jul 6, 2016 at 11:13 AM, Reynold Xin <rxin@databricks.com 
> <ma...@databricks.com>> wrote:
>
>     Among the people working on Spark there are a lot of confusions
>     about what
>     Parquet's filter pushdown actually accomplishes. Depending on who
>     I talk
>     to, I get "it filters rows one by one" or "it skips blocks via min/max
>     value tracking". Can I get a more official response on this?
>
>     The reason I'm asking is that we have seen so many bugs related to
>     filter
>     pushdown (either bugs in Parquet, or bugs in Spark's
>     implementation of it)
>     that we are considering just permanently disabling filter
>     pushdown, if the
>     performance gain is not enormous.
>
>     Let me know. Thanks.
>
>
>
>
> -- 
> Ryan Blue
> Software Engineer
> Netflix


Re: parquet-mr filter pushdown

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Hi Reynold,

Parquet uses the same predicates that are passed to the reader (via
withFilter [1]) for both record-level and row group filtering. We've found
that the main benefit is when they can be used to eliminate entire row
groups.

What bugs have you found? I've not seen problems with the filtering done by
Parquet so I'm surprised that you guys have seen so many (presumably that
you've tracked to Parquet push-down?) that it doesn't seem worth it.

Both record and row group filtering use the same predicates. Record
filtering evaluates a predicate using an assembled record, so it is
probably slower than filtering in Spark SQL. This is faster for engines
like Pig that don't have vectorized reads and would have additional calls
on top of the Parquet layer. Also, the 2.0 spec makes it possible to filter
individual data pages, but this hasn't been implemented.

In contrast to record-level, row group filtering is *very* valuable when
data is correctly prepared. We have datasets where row group filtering gets
us a 20-100x speedup (measured in Pig, Presto, and Spark) because we only
need to read 1% of the data. This uses column-level stats from the footer
and dictionaries to eliminate row groups that can't satisfy the query
predicate. For example, for a column with min=5, max=26 and a predicate x <
0, we know that there are no matching values. Similarly, we can look at a
dictionary and see all of the possible values and eliminate a row group if
none of them match the predicate. Row group filtering works best with the
data sorted within partitions by common query columns.

rb

[1]:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L190

On Wed, Jul 6, 2016 at 11:13 AM, Reynold Xin <rx...@databricks.com> wrote:

> Among the people working on Spark there are a lot of confusions about what
> Parquet's filter pushdown actually accomplishes. Depending on who I talk
> to, I get "it filters rows one by one" or "it skips blocks via min/max
> value tracking". Can I get a more official response on this?
>
> The reason I'm asking is that we have seen so many bugs related to filter
> pushdown (either bugs in Parquet, or bugs in Spark's implementation of it)
> that we are considering just permanently disabling filter pushdown, if the
> performance gain is not enormous.
>
> Let me know. Thanks.
>



-- 
Ryan Blue
Software Engineer
Netflix