You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Telmo Rodrigues <te...@gmail.com> on 2016/05/11 00:57:10 UTC

Spark 1.6 Catalyst optimizer

Hello,

I have a question about the Catalyst optimizer in Spark 1.6.

initial logical plan:

!'Project [unresolvedalias(*)]
!+- 'Filter ('t.id = 1)
!   +- 'Join Inner, Some(('t.id = 'u.id))
!      :- 'UnresolvedRelation `t`, None
!      +- 'UnresolvedRelation `u`, None


logical plan after optimizer execution:

Project [id#0L,id#1L]
!+- Filter (id#0L = cast(1 as bigint))
!   +- Join Inner, Some((id#0L = id#1L))
!      :- Subquery t
!          :  +- Relation[id#0L] JSONRelation
!      +- Subquery u
!          +- Relation[id#1L] JSONRelation


Shouldn't the optimizer push down predicates to subquery t in order to the
filter be executed before join?

Thanks

Re: Spark 1.6 Catalyst optimizer

Posted by Telmo Rodrigues <te...@gmail.com>.
Thank you Takeshi.

After executing df3.explain(true) I realised that the Optimiser batches are
being performed and also the predicate push down.

I think that only the analiser batches are executed when creating the data
frame by the context.sql(query). It seems that the optimiser batches are
executed when some action like collect or explain takes place.

scala> d3.explain(true)
16/05/13 02:08:12 DEBUG Analyzer$ResolveReferences: Resolving 't.id to id#0L
16/05/13 02:08:12 DEBUG Analyzer$ResolveReferences: Resolving 'u.id to id#1L
16/05/13 02:08:12 DEBUG Analyzer$ResolveReferences: Resolving 't.id to id#0L
16/05/13 02:08:12 DEBUG SQLContext$$anon$1:
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
+- 'Filter ('t.id = 1)
   +- 'Join Inner, Some(('t.id = 'u.id))
      :- 'UnresolvedRelation `t`, None
      +- 'UnresolvedRelation `u`, None

== Analyzed Logical Plan ==
id: bigint, id: bigint
Project [id#0L,id#1L]
+- Filter (id#0L = cast(1 as bigint))
   +- Join Inner, Some((id#0L = id#1L))
      :- Subquery t
      :  +- Relation[id#0L] JSONRelation
      +- Subquery u
         +- Relation[id#1L] JSONRelation

== Optimized Logical Plan ==
Project [id#0L,id#1L]
+- Join Inner, Some((id#0L = id#1L))
   :- Filter (id#0L = 1)
   :  +- Relation[id#0L] JSONRelation
   +- Relation[id#1L] JSONRelation

== Physical Plan ==
Project [id#0L,id#1L]
+- BroadcastHashJoin [id#0L], [id#1L], BuildRight
   :- Filter (id#0L = 1)
   :  +- Scan JSONRelation[id#0L] InputPaths: file:/persons.json,
PushedFilters: [EqualTo(id,1)]
   +- Scan JSONRelation[id#1L] InputPaths: file:/cars.json


2016-05-12 16:34 GMT+01:00 Takeshi Yamamuro <li...@gmail.com>:

> Hi,
>
> What's the result of `df3.explain(true)`?
>
> // maropu
>
> On Thu, May 12, 2016 at 10:04 AM, Telmo Rodrigues <
> telmo.galante.rodrigues@gmail.com> wrote:
>
>> I'm building spark from branch-1.6 source with mvn -DskipTests package
>> and I'm running the following code with spark shell.
>>
>> *val* sqlContext *=* *new* org.apache.spark.sql.*SQLContext*(sc)
>>
>> *import* *sqlContext.implicits._*
>>
>>
>> *val df = sqlContext.read.json("persons.json")*
>>
>> *val df2 = sqlContext.read.json("cars.json")*
>>
>>
>> *df.registerTempTable("t")*
>>
>> *df2.registerTempTable("u")*
>>
>>
>> *val d3 =sqlContext.sql("select * from t join u on t.id <http://t.id> =
>> u.id <http://u.id> where t.id <http://t.id> = 1")*
>>
>> With the log4j root category level WARN, the last printed messages refers
>> to the Batch Resolution rules execution.
>>
>> === Result of Batch Resolution ===
>> !'Project [unresolvedalias(*)]              Project [id#0L,id#1L]
>> !+- 'Filter ('t.id = 1)                     +- Filter (id#0L = cast(1 as
>> bigint))
>> !   +- 'Join Inner, Some(('t.id = 'u.id))      +- Join Inner,
>> Some((id#0L = id#1L))
>> !      :- 'UnresolvedRelation `t`, None           :- Subquery t
>> !      +- 'UnresolvedRelation `u`, None           :  +- Relation[id#0L]
>> JSONRelation
>> !                                                 +- Subquery u
>> !                                                    +- Relation[id#1L]
>> JSONRelation
>>
>>
>> I think that only the analyser rules are being executed.
>>
>> The optimiser rules should not to run in this case?
>>
>> 2016-05-11 19:24 GMT+01:00 Michael Armbrust <mi...@databricks.com>:
>>
>>>
>>>> logical plan after optimizer execution:
>>>>
>>>> Project [id#0L,id#1L]
>>>> !+- Filter (id#0L = cast(1 as bigint))
>>>> !   +- Join Inner, Some((id#0L = id#1L))
>>>> !      :- Subquery t
>>>> !          :  +- Relation[id#0L] JSONRelation
>>>> !      +- Subquery u
>>>> !          +- Relation[id#1L] JSONRelation
>>>>
>>>
>>> I think you are mistaken.  If this was the optimized plan there would be
>>> no subqueries.
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: Spark 1.6 Catalyst optimizer

Posted by Takeshi Yamamuro <li...@gmail.com>.
Hi,

What's the result of `df3.explain(true)`?

// maropu

On Thu, May 12, 2016 at 10:04 AM, Telmo Rodrigues <
telmo.galante.rodrigues@gmail.com> wrote:

> I'm building spark from branch-1.6 source with mvn -DskipTests package and
> I'm running the following code with spark shell.
>
> *val* sqlContext *=* *new* org.apache.spark.sql.*SQLContext*(sc)
>
> *import* *sqlContext.implicits._*
>
>
> *val df = sqlContext.read.json("persons.json")*
>
> *val df2 = sqlContext.read.json("cars.json")*
>
>
> *df.registerTempTable("t")*
>
> *df2.registerTempTable("u")*
>
>
> *val d3 =sqlContext.sql("select * from t join u on t.id <http://t.id> =
> u.id <http://u.id> where t.id <http://t.id> = 1")*
>
> With the log4j root category level WARN, the last printed messages refers
> to the Batch Resolution rules execution.
>
> === Result of Batch Resolution ===
> !'Project [unresolvedalias(*)]              Project [id#0L,id#1L]
> !+- 'Filter ('t.id = 1)                     +- Filter (id#0L = cast(1 as
> bigint))
> !   +- 'Join Inner, Some(('t.id = 'u.id))      +- Join Inner, Some((id#0L
> = id#1L))
> !      :- 'UnresolvedRelation `t`, None           :- Subquery t
> !      +- 'UnresolvedRelation `u`, None           :  +- Relation[id#0L]
> JSONRelation
> !                                                 +- Subquery u
> !                                                    +- Relation[id#1L]
> JSONRelation
>
>
> I think that only the analyser rules are being executed.
>
> The optimiser rules should not to run in this case?
>
> 2016-05-11 19:24 GMT+01:00 Michael Armbrust <mi...@databricks.com>:
>
>>
>>> logical plan after optimizer execution:
>>>
>>> Project [id#0L,id#1L]
>>> !+- Filter (id#0L = cast(1 as bigint))
>>> !   +- Join Inner, Some((id#0L = id#1L))
>>> !      :- Subquery t
>>> !          :  +- Relation[id#0L] JSONRelation
>>> !      +- Subquery u
>>> !          +- Relation[id#1L] JSONRelation
>>>
>>
>> I think you are mistaken.  If this was the optimized plan there would be
>> no subqueries.
>>
>
>


-- 
---
Takeshi Yamamuro

Re: Spark 1.6 Catalyst optimizer

Posted by Telmo Rodrigues <te...@gmail.com>.
I'm building spark from branch-1.6 source with mvn -DskipTests package and
I'm running the following code with spark shell.

*val* sqlContext *=* *new* org.apache.spark.sql.*SQLContext*(sc)

*import* *sqlContext.implicits._*


*val df = sqlContext.read.json("persons.json")*

*val df2 = sqlContext.read.json("cars.json")*


*df.registerTempTable("t")*

*df2.registerTempTable("u")*


*val d3 =sqlContext.sql("select * from t join u on t.id <http://t.id> =
u.id <http://u.id> where t.id <http://t.id> = 1")*

With the log4j root category level WARN, the last printed messages refers
to the Batch Resolution rules execution.

=== Result of Batch Resolution ===
!'Project [unresolvedalias(*)]              Project [id#0L,id#1L]
!+- 'Filter ('t.id = 1)                     +- Filter (id#0L = cast(1 as
bigint))
!   +- 'Join Inner, Some(('t.id = 'u.id))      +- Join Inner, Some((id#0L =
id#1L))
!      :- 'UnresolvedRelation `t`, None           :- Subquery t
!      +- 'UnresolvedRelation `u`, None           :  +- Relation[id#0L]
JSONRelation
!                                                 +- Subquery u
!                                                    +- Relation[id#1L]
JSONRelation


I think that only the analyser rules are being executed.

The optimiser rules should not to run in this case?

2016-05-11 19:24 GMT+01:00 Michael Armbrust <mi...@databricks.com>:

>
>> logical plan after optimizer execution:
>>
>> Project [id#0L,id#1L]
>> !+- Filter (id#0L = cast(1 as bigint))
>> !   +- Join Inner, Some((id#0L = id#1L))
>> !      :- Subquery t
>> !          :  +- Relation[id#0L] JSONRelation
>> !      +- Subquery u
>> !          +- Relation[id#1L] JSONRelation
>>
>
> I think you are mistaken.  If this was the optimized plan there would be
> no subqueries.
>

Re: Spark 1.6 Catalyst optimizer

Posted by Michael Armbrust <mi...@databricks.com>.
>
>
> logical plan after optimizer execution:
>
> Project [id#0L,id#1L]
> !+- Filter (id#0L = cast(1 as bigint))
> !   +- Join Inner, Some((id#0L = id#1L))
> !      :- Subquery t
> !          :  +- Relation[id#0L] JSONRelation
> !      +- Subquery u
> !          +- Relation[id#1L] JSONRelation
>

I think you are mistaken.  If this was the optimized plan there would be no
subqueries.

Re: Spark 1.6 Catalyst optimizer

Posted by Rishi Mishra <rm...@snappydata.io>.
Will try with JSON relation, but with Spark's temp tables (Spark version
1.6 ) I get an optimized plan as you have mentioned. Should not be much
different though.

Query : "select t1.col2, t1.col3 from t1, t2 where t1.col1=t2.col1 and
t1.col3=7"

Plan :

Project [COL2#1,COL3#2]
+- Join Inner, Some((COL1#0 = COL1#3))
   :- Filter (COL3#2 = 7)
   :  +- LogicalRDD [col1#0,col2#1,col3#2], MapPartitionsRDD[4] at apply at
Transformer.scala:22
   +- Project [COL1#3]
      +- LogicalRDD [col1#3,col2#4,col3#5], MapPartitionsRDD[5] at apply at
Transformer.scala:22

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Wed, May 11, 2016 at 4:56 PM, Telmo Rodrigues <
telmo.galante.rodrigues@gmail.com> wrote:

> In this case, isn't better to perform the filter earlier as possible even
> there could be unhandled predicates?
>
> Telmo Rodrigues
>
> No dia 11/05/2016, às 09:49, Rishi Mishra <rm...@snappydata.io>
> escreveu:
>
> It does push the predicate. But as a relations are generic and might or
> might not handle some of the predicates , it needs to apply filter of
> un-handled predicates.
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues <
> telmo.galante.rodrigues@gmail.com> wrote:
>
>> Hello,
>>
>> I have a question about the Catalyst optimizer in Spark 1.6.
>>
>> initial logical plan:
>>
>> !'Project [unresolvedalias(*)]
>> !+- 'Filter ('t.id = 1)
>> !   +- 'Join Inner, Some(('t.id = 'u.id))
>> !      :- 'UnresolvedRelation `t`, None
>> !      +- 'UnresolvedRelation `u`, None
>>
>>
>> logical plan after optimizer execution:
>>
>> Project [id#0L,id#1L]
>> !+- Filter (id#0L = cast(1 as bigint))
>> !   +- Join Inner, Some((id#0L = id#1L))
>> !      :- Subquery t
>> !          :  +- Relation[id#0L] JSONRelation
>> !      +- Subquery u
>> !          +- Relation[id#1L] JSONRelation
>>
>>
>> Shouldn't the optimizer push down predicates to subquery t in order to
>> the filter be executed before join?
>>
>> Thanks
>>
>>
>>
>

Re: Spark 1.6 Catalyst optimizer

Posted by Telmo Rodrigues <te...@gmail.com>.
In this case, isn't better to perform the filter earlier as possible even there could be unhandled predicates?

Telmo Rodrigues

No dia 11/05/2016, às 09:49, Rishi Mishra <rm...@snappydata.io> escreveu:

> It does push the predicate. But as a relations are generic and might or might not handle some of the predicates , it needs to apply filter of un-handled predicates. 
> 
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
> 
> https://in.linkedin.com/in/rishiteshmishra
> 
>> On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues <te...@gmail.com> wrote:
>> Hello,
>> 
>> I have a question about the Catalyst optimizer in Spark 1.6.
>> 
>> initial logical plan:
>> 
>> !'Project [unresolvedalias(*)]
>> !+- 'Filter ('t.id = 1)
>> !   +- 'Join Inner, Some(('t.id = 'u.id))
>> !      :- 'UnresolvedRelation `t`, None
>> !      +- 'UnresolvedRelation `u`, None
>> 
>> 
>> logical plan after optimizer execution:
>> 
>> Project [id#0L,id#1L]
>> !+- Filter (id#0L = cast(1 as bigint))
>> !   +- Join Inner, Some((id#0L = id#1L))
>> !      :- Subquery t
>> !          :  +- Relation[id#0L] JSONRelation
>> !      +- Subquery u
>> !          +- Relation[id#1L] JSONRelation
>> 
>> 
>> Shouldn't the optimizer push down predicates to subquery t in order to the filter be executed before join?
>> 
>> Thanks
> 

Re: Spark 1.6 Catalyst optimizer

Posted by Rishi Mishra <rm...@snappydata.io>.
It does push the predicate. But as a relations are generic and might or
might not handle some of the predicates , it needs to apply filter of
un-handled predicates.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues <
telmo.galante.rodrigues@gmail.com> wrote:

> Hello,
>
> I have a question about the Catalyst optimizer in Spark 1.6.
>
> initial logical plan:
>
> !'Project [unresolvedalias(*)]
> !+- 'Filter ('t.id = 1)
> !   +- 'Join Inner, Some(('t.id = 'u.id))
> !      :- 'UnresolvedRelation `t`, None
> !      +- 'UnresolvedRelation `u`, None
>
>
> logical plan after optimizer execution:
>
> Project [id#0L,id#1L]
> !+- Filter (id#0L = cast(1 as bigint))
> !   +- Join Inner, Some((id#0L = id#1L))
> !      :- Subquery t
> !          :  +- Relation[id#0L] JSONRelation
> !      +- Subquery u
> !          +- Relation[id#1L] JSONRelation
>
>
> Shouldn't the optimizer push down predicates to subquery t in order to the
> filter be executed before join?
>
> Thanks
>
>
>