You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kant kodali <ka...@gmail.com> on 2016/11/17 06:03:22 UTC

How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

                             vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")

Re: How does predicate push down really help?

Posted by kant kodali <ka...@gmail.com>.
Thanks for the effort and clear explanation.

On Thu, Nov 17, 2016 at 12:07 AM, kant kodali <ka...@gmail.com> wrote:

> Yes thats how I understood it with your first email as well but the key
> thing here sounds like some datasources may not have operators such as
> filter and so on in which case Spark Still needs to work and be able to
> apply filter operation in memory after grabbing all the rows into memory.
>
>
>
> On Wed, Nov 16, 2016 at 11:56 PM, Mendelson, Assaf <
> Assaf.Mendelson@rsa.com> wrote:
>
>> In the first example, you define the table to be table users from some
>> SQL server. Then you perform a filter.
>>
>> Without predicate pushdown (or any optimization) basically spark
>> understand this as follows:
>>
>> “grab the data from the source described” (which in this case means get
>> all of the table from the external sql server to spark memory)
>>
>> “do the operations I asked for” (in this case filtering).
>>
>> What predicate pushdown means in this case is that since spark knows an
>> external SQL server can actually understand and benefit from the filter
>> command it can actually send the filter as part of the query and then once
>> the data arrives in spark, it is already filtered.
>>
>>
>>
>> In the second example we have two tables A and B. What you ask in the
>> command is:
>>
>> “Read A”
>>
>> “Read B”
>>
>> “Perform the join” (which is a heavy operation)
>>
>> “Perform the filtering on the result”
>>
>>
>>
>> What predicate pushdown would do instead is translate it to:
>>
>> “Read A”
>>
>> “Perform filtering on A”
>>
>> “Read B”
>>
>> “Perform filtering on B”
>>
>> “perform the join on the filtered A and B”
>>
>> Now the join is being made on smaller data (after the filtering) and
>> therefore takes less time. The heuristic is that in most cases the time
>> saved on the join would be much more than any extra time taken by the
>> filter itself.
>>
>>
>>
>> BTW. You can see the differences between the original plan and the
>> optimized plan by calling explain(true) on the dataframe.  This would show
>> you what was parsed, how the optimization worked and what was physically
>> run.
>>
>>
>>
>> Assaf.
>>
>>
>>
>> *From:* kant kodali [mailto:kanth909@gmail.com]
>> *Sent:* Thursday, November 17, 2016 9:50 AM
>> *To:* Mendelson, Assaf
>> *Cc:* user @spark
>> *Subject:* Re: How does predicate push down really help?
>>
>>
>>
>> Hi Assaf,
>>
>>
>>
>> I am still trying to understand the merits of predicate push down from
>> the examples you pointed out.
>>
>>
>>
>> Example 1: Say we don't have a predicate push down feature why does spark
>> needs to pull all the rows and filter it in memory? why not simply issue
>> select statement with "where" clause to do the filtering via JDBC or
>> something?
>>
>>
>>
>> Example 2: Same Argument as Example 1 except when we don't have a
>> predicate push down feature we could simply do it using JOIN and where
>> operators in the SQL statement right.
>>
>>
>>
>> I feel like I am missing something to understand the merits of predicate
>> push down.
>>
>>
>>
>> Thanks,
>>
>> kant
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf <
>> Assaf.Mendelson@rsa.com> wrote:
>>
>> Actually, both you translate to the same plan.
>>
>> When you do sql(“some code”) or filter, it doesn’t actually do the query.
>> Instead it is translated to a plan (parsed plan) which transform everything
>> into standard spark expressions. Then spark analyzes it to fill in the
>> blanks (what is users table for example) and attempts to optimize it.
>> Predicate pushdown happens in the optimization portion.
>>
>> For example, let’s say that users would actually be backed by a table on
>> an sql query in mysql.
>>
>> Without predicate pushdown spark would first pull the entire users table
>> from mysql and only then do the filtering. Predicate pushdown would mean
>> the filtering would be done as part of the original sql query.
>>
>>
>>
>> Another (probably better) example would be something like having two
>> table A and B which are joined by some common key. Then a filtering is done
>> on the key. Moving the filter to be before the join would probably make
>> everything faster as filter is a faster operation than a join.
>>
>>
>>
>> Assaf.
>>
>>
>>
>> *From:* kant kodali [mailto:kanth909@gmail.com]
>> *Sent:* Thursday, November 17, 2016 8:03 AM
>> *To:* user @spark
>> *Subject:* How does predicate push down really help?
>>
>>
>>
>> How does predicate push down really help? in the following cases
>>
>>
>>
>> val df1 = spark.sql("select * from users where age > 30")
>>
>>
>>
>>                              vs
>>
>>
>>
>> val df1 = spark.sql("select * from users")
>>
>> df.filter("age > 30")
>>
>>
>>
>>
>>
>>
>>
>
>

Re: How does predicate push down really help?

Posted by kant kodali <ka...@gmail.com>.
Yes thats how I understood it with your first email as well but the key
thing here sounds like some datasources may not have operators such as
filter and so on in which case Spark Still needs to work and be able to
apply filter operation in memory after grabbing all the rows into memory.



On Wed, Nov 16, 2016 at 11:56 PM, Mendelson, Assaf <As...@rsa.com>
wrote:

> In the first example, you define the table to be table users from some SQL
> server. Then you perform a filter.
>
> Without predicate pushdown (or any optimization) basically spark
> understand this as follows:
>
> “grab the data from the source described” (which in this case means get
> all of the table from the external sql server to spark memory)
>
> “do the operations I asked for” (in this case filtering).
>
> What predicate pushdown means in this case is that since spark knows an
> external SQL server can actually understand and benefit from the filter
> command it can actually send the filter as part of the query and then once
> the data arrives in spark, it is already filtered.
>
>
>
> In the second example we have two tables A and B. What you ask in the
> command is:
>
> “Read A”
>
> “Read B”
>
> “Perform the join” (which is a heavy operation)
>
> “Perform the filtering on the result”
>
>
>
> What predicate pushdown would do instead is translate it to:
>
> “Read A”
>
> “Perform filtering on A”
>
> “Read B”
>
> “Perform filtering on B”
>
> “perform the join on the filtered A and B”
>
> Now the join is being made on smaller data (after the filtering) and
> therefore takes less time. The heuristic is that in most cases the time
> saved on the join would be much more than any extra time taken by the
> filter itself.
>
>
>
> BTW. You can see the differences between the original plan and the
> optimized plan by calling explain(true) on the dataframe.  This would show
> you what was parsed, how the optimization worked and what was physically
> run.
>
>
>
> Assaf.
>
>
>
> *From:* kant kodali [mailto:kanth909@gmail.com]
> *Sent:* Thursday, November 17, 2016 9:50 AM
> *To:* Mendelson, Assaf
> *Cc:* user @spark
> *Subject:* Re: How does predicate push down really help?
>
>
>
> Hi Assaf,
>
>
>
> I am still trying to understand the merits of predicate push down from the
> examples you pointed out.
>
>
>
> Example 1: Say we don't have a predicate push down feature why does spark
> needs to pull all the rows and filter it in memory? why not simply issue
> select statement with "where" clause to do the filtering via JDBC or
> something?
>
>
>
> Example 2: Same Argument as Example 1 except when we don't have a
> predicate push down feature we could simply do it using JOIN and where
> operators in the SQL statement right.
>
>
>
> I feel like I am missing something to understand the merits of predicate
> push down.
>
>
>
> Thanks,
>
> kant
>
>
>
>
>
>
>
>
>
> On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf <
> Assaf.Mendelson@rsa.com> wrote:
>
> Actually, both you translate to the same plan.
>
> When you do sql(“some code”) or filter, it doesn’t actually do the query.
> Instead it is translated to a plan (parsed plan) which transform everything
> into standard spark expressions. Then spark analyzes it to fill in the
> blanks (what is users table for example) and attempts to optimize it.
> Predicate pushdown happens in the optimization portion.
>
> For example, let’s say that users would actually be backed by a table on
> an sql query in mysql.
>
> Without predicate pushdown spark would first pull the entire users table
> from mysql and only then do the filtering. Predicate pushdown would mean
> the filtering would be done as part of the original sql query.
>
>
>
> Another (probably better) example would be something like having two table
> A and B which are joined by some common key. Then a filtering is done on
> the key. Moving the filter to be before the join would probably make
> everything faster as filter is a faster operation than a join.
>
>
>
> Assaf.
>
>
>
> *From:* kant kodali [mailto:kanth909@gmail.com]
> *Sent:* Thursday, November 17, 2016 8:03 AM
> *To:* user @spark
> *Subject:* How does predicate push down really help?
>
>
>
> How does predicate push down really help? in the following cases
>
>
>
> val df1 = spark.sql("select * from users where age > 30")
>
>
>
>                              vs
>
>
>
> val df1 = spark.sql("select * from users")
>
> df.filter("age > 30")
>
>
>
>
>
>
>

RE: How does predicate push down really help?

Posted by "Mendelson, Assaf" <As...@rsa.com>.
In the first example, you define the table to be table users from some SQL server. Then you perform a filter.
Without predicate pushdown (or any optimization) basically spark understand this as follows:
“grab the data from the source described” (which in this case means get all of the table from the external sql server to spark memory)
“do the operations I asked for” (in this case filtering).
What predicate pushdown means in this case is that since spark knows an external SQL server can actually understand and benefit from the filter command it can actually send the filter as part of the query and then once the data arrives in spark, it is already filtered.

In the second example we have two tables A and B. What you ask in the command is:
“Read A”
“Read B”
“Perform the join” (which is a heavy operation)
“Perform the filtering on the result”

What predicate pushdown would do instead is translate it to:
“Read A”
“Perform filtering on A”
“Read B”
“Perform filtering on B”
“perform the join on the filtered A and B”
Now the join is being made on smaller data (after the filtering) and therefore takes less time. The heuristic is that in most cases the time saved on the join would be much more than any extra time taken by the filter itself.

BTW. You can see the differences between the original plan and the optimized plan by calling explain(true) on the dataframe.  This would show you what was parsed, how the optimization worked and what was physically run.

Assaf.

From: kant kodali [mailto:kanth909@gmail.com]
Sent: Thursday, November 17, 2016 9:50 AM
To: Mendelson, Assaf
Cc: user @spark
Subject: Re: How does predicate push down really help?

Hi Assaf,

I am still trying to understand the merits of predicate push down from the examples you pointed out.

Example 1: Say we don't have a predicate push down feature why does spark needs to pull all the rows and filter it in memory? why not simply issue select statement with "where" clause to do the filtering via JDBC or something?

Example 2: Same Argument as Example 1 except when we don't have a predicate push down feature we could simply do it using JOIN and where operators in the SQL statement right.

I feel like I am missing something to understand the merits of predicate push down.

Thanks,
kant




On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf <As...@rsa.com>> wrote:
Actually, both you translate to the same plan.
When you do sql(“some code”) or filter, it doesn’t actually do the query. Instead it is translated to a plan (parsed plan) which transform everything into standard spark expressions. Then spark analyzes it to fill in the blanks (what is users table for example) and attempts to optimize it. Predicate pushdown happens in the optimization portion.
For example, let’s say that users would actually be backed by a table on an sql query in mysql.
Without predicate pushdown spark would first pull the entire users table from mysql and only then do the filtering. Predicate pushdown would mean the filtering would be done as part of the original sql query.

Another (probably better) example would be something like having two table A and B which are joined by some common key. Then a filtering is done on the key. Moving the filter to be before the join would probably make everything faster as filter is a faster operation than a join.

Assaf.

From: kant kodali [mailto:kanth909@gmail.com<ma...@gmail.com>]
Sent: Thursday, November 17, 2016 8:03 AM
To: user @spark
Subject: How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

                             vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")




Re: How does predicate push down really help?

Posted by Ashic Mahtab <as...@live.com>.
Consider a data source that has data in 500mb files, and doesn't support predicate push down. Spark will have to load all the data into memory before it can apply filtering, select "columns" etc. Each 500mb file will at some point have to be loaded entirely in memory. Now consider a data source that does support predicate push down, like mysql. Spark will only need to retrieve the rows and columns it needs as the db provides an interface for it to do so. If the underlying data source supports predicate push down, and the corresponding connector supports it, then filtering, projection, etc. is pushed down to the storage level. If not, the full dataset needs to be loaded into memory, and filtering, projection, etc. would happen there.

Get Outlook for Android<https://aka.ms/ghei36>



On Thu, Nov 17, 2016 at 7:50 AM +0000, "kant kodali" <ka...@gmail.com>> wrote:

Hi Assaf,

I am still trying to understand the merits of predicate push down from the examples you pointed out.

Example 1: Say we don't have a predicate push down feature why does spark needs to pull all the rows and filter it in memory? why not simply issue select statement with "where" clause to do the filtering via JDBC or something?

Example 2: Same Argument as Example 1 except when we don't have a predicate push down feature we could simply do it using JOIN and where operators in the SQL statement right.

I feel like I am missing something to understand the merits of predicate push down.

Thanks,
kant




On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf <As...@rsa.com>> wrote:
Actually, both you translate to the same plan.
When you do sql("some code") or filter, it doesn't actually do the query. Instead it is translated to a plan (parsed plan) which transform everything into standard spark expressions. Then spark analyzes it to fill in the blanks (what is users table for example) and attempts to optimize it. Predicate pushdown happens in the optimization portion.
For example, let's say that users would actually be backed by a table on an sql query in mysql.
Without predicate pushdown spark would first pull the entire users table from mysql and only then do the filtering. Predicate pushdown would mean the filtering would be done as part of the original sql query.

Another (probably better) example would be something like having two table A and B which are joined by some common key. Then a filtering is done on the key. Moving the filter to be before the join would probably make everything faster as filter is a faster operation than a join.

Assaf.

From: kant kodali [mailto:kanth909@gmail.com<ma...@gmail.com>]
Sent: Thursday, November 17, 2016 8:03 AM
To: user @spark
Subject: How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

                             vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")




Re: How does predicate push down really help?

Posted by kant kodali <ka...@gmail.com>.
Hi Assaf,

I am still trying to understand the merits of predicate push down from the
examples you pointed out.

Example 1: Say we don't have a predicate push down feature why does spark
needs to pull all the rows and filter it in memory? why not simply issue
select statement with "where" clause to do the filtering via JDBC or
something?

Example 2: Same Argument as Example 1 except when we don't have a predicate
push down feature we could simply do it using JOIN and where operators in
the SQL statement right.

I feel like I am missing something to understand the merits of predicate
push down.

Thanks,
kant




On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf <As...@rsa.com>
wrote:

> Actually, both you translate to the same plan.
>
> When you do sql(“some code”) or filter, it doesn’t actually do the query.
> Instead it is translated to a plan (parsed plan) which transform everything
> into standard spark expressions. Then spark analyzes it to fill in the
> blanks (what is users table for example) and attempts to optimize it.
> Predicate pushdown happens in the optimization portion.
>
> For example, let’s say that users would actually be backed by a table on
> an sql query in mysql.
>
> Without predicate pushdown spark would first pull the entire users table
> from mysql and only then do the filtering. Predicate pushdown would mean
> the filtering would be done as part of the original sql query.
>
>
>
> Another (probably better) example would be something like having two table
> A and B which are joined by some common key. Then a filtering is done on
> the key. Moving the filter to be before the join would probably make
> everything faster as filter is a faster operation than a join.
>
>
>
> Assaf.
>
>
>
> *From:* kant kodali [mailto:kanth909@gmail.com]
> *Sent:* Thursday, November 17, 2016 8:03 AM
> *To:* user @spark
> *Subject:* How does predicate push down really help?
>
>
>
> How does predicate push down really help? in the following cases
>
>
>
> val df1 = spark.sql("select * from users where age > 30")
>
>
>
>                              vs
>
>
>
> val df1 = spark.sql("select * from users")
>
> df.filter("age > 30")
>
>
>
>
>

RE: How does predicate push down really help?

Posted by "Mendelson, Assaf" <As...@rsa.com>.
Actually, both you translate to the same plan.
When you do sql(“some code”) or filter, it doesn’t actually do the query. Instead it is translated to a plan (parsed plan) which transform everything into standard spark expressions. Then spark analyzes it to fill in the blanks (what is users table for example) and attempts to optimize it. Predicate pushdown happens in the optimization portion.
For example, let’s say that users would actually be backed by a table on an sql query in mysql.
Without predicate pushdown spark would first pull the entire users table from mysql and only then do the filtering. Predicate pushdown would mean the filtering would be done as part of the original sql query.

Another (probably better) example would be something like having two table A and B which are joined by some common key. Then a filtering is done on the key. Moving the filter to be before the join would probably make everything faster as filter is a faster operation than a join.

Assaf.

From: kant kodali [mailto:kanth909@gmail.com]
Sent: Thursday, November 17, 2016 8:03 AM
To: user @spark
Subject: How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

                             vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")