You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by William Wong <wi...@gmail.com> on 2019/06/14 16:13:55 UTC

Filter cannot be pushed via a Join

Dear all,

I created two tables.

scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
USING PARQUET");
19/06/14 23:49:10 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0
19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
returning NoSuchObjectException
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
USING PARQUET");
res2: org.apache.spark.sql.DataFrame = []


It is the plan of joining these two column via ID column. It looks good to
me as the filter 'id ='a'' is pushed to both tables as expected.

scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
AND t1.id ='a'").explain
== Physical Plan ==
*(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
:- *(2) Project [id#23, val#24]
:  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
:     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
*PartitionFilters:
[], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(1) Project [id#68, val#69]
      +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
         +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
*PartitionFilters:
[], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
struct<id:string,val:string>


Somehow, we created a view on table1 by union a few partitions like this:

scala> spark.sql("""
     | CREATE VIEW partitioned_table_1 AS
     | SELECT * FROM table1 WHERE id = 'a'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'b'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'c'
     | UNION ALL
     | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
     | """.stripMargin)
res7: org.apache.spark.sql.DataFrame = []


In theory, selecting data via this view 'partitioned_table_1' should be the
same as via the table 'table1'

This query also can push the filter 'id IN ('a','b','c','d') to table2 as
expected.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
== Physical Plan ==
*(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
[a,b,c,d])], ReadSchema: struct<id:string,val:string>
:  :- *(2) Project [id#0, val#1]
:  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
[a,b,c,d])], ReadSchema: struct<id:string,val:string>
:  :- *(3) Project [id#0, val#1]
:  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
[a,b,c,d])], ReadSchema: struct<id:string,val:string>
:  +- *(4) Project [id#0, val#1]
:     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
isnotnull(id#0))
:        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
[a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(5) Project [id#23, val#24]
      +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) && (((id#23
= a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
isnotnull(id#23))
         +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
*ReadSchema: struct<id:string,val:string>

scala>


However, if we change the filter to 'id ='a', something strange happened.
The filter 'id = 'a' cannot be pushed via table2...

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").explain
== Physical Plan ==
*(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
:  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
ReadSchema: struct<id:string,val:string>
:  :- LocalTableScan <empty>, [id#0, val#1]
:  :- LocalTableScan <empty>, [id#0, val#1]
:  +- *(2) Project [id#0, val#1]
:     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
a))
:        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(3) Project [id#23, val#24]
      +- *(3) Filter isnotnull(id#23)
         +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:string,val:string>


Appreciate if anyone has an idea on it. Many thanks.

Best regards,
William

Re: Filter cannot be pushed via a Join

Posted by William Wong <wi...@gmail.com>.
Hi Xiao,

Just report this with JIRA SPARK-28103.

https://issues.apache.org/jira/browse/SPARK-28103

Thanks and Regards,
William

On Wed, 19 Jun 2019 at 1:35 AM, Xiao Li <ga...@gmail.com> wrote:

> Hi, William,
>
> Thanks for reporting it. Could you open a JIRA?
>
> Cheers,
>
> Xiao
>
> William Wong <wi...@gmail.com> 于2019年6月18日周二 上午8:57写道:
>
>> BTW, I noticed a workaround is creating a custom rule to remove 'empty
>> local relation' from a union table. However, I am not 100% sure if it is
>> the right approach.
>>
>> On Tue, Jun 18, 2019 at 11:53 PM William Wong <wi...@gmail.com>
>> wrote:
>>
>>> Dear all,
>>>
>>> I am not sure if it is something expected or not, and should I report it
>>> as a bug.  Basically, the constraints of a union table could be turned
>>> empty if any subtable is turned into an empty local relation. The side
>>> effect is filter cannot be inferred correctly (by
>>> InferFiltersFromConstrains)
>>>
>>> We may reproduce the issue with the following setup:
>>> 1) Prepare two tables:
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
>>> USING PARQUET");
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
>>> USING PARQUET");
>>>
>>> 2) Create a union view on table1.
>>> * spark.sql("""
>>>      | CREATE VIEW partitioned_table_1 AS
>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>      | """.stripMargin)
>>>
>>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
>>> be inferred. We can see that the constraints of the left table are empty.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
>>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter (isnotnull(id#0) && (id#0 = a))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- LocalRelation <empty>, [id#0, val#1]
>>> :  :- LocalRelation <empty>, [id#0, val#1]
>>> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
>>> :     +- Relation[id#0,val#1] parquet
>>> +- Filter isnotnull(id#4)
>>>    +- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id =
>>> 'a'").queryExecution.optimizedPlan.children(0).constraints
>>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>>>
>>> 4) Modified the query to avoid empty local relation. The filter 't2.id
>>> in ('a','b','c','d')' is then inferred properly. The constraints of the
>>> left table are not empty as well.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan
>>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>> isnotnull(id#0))
>>> :     +- Relation[id#0,val#1] parquet
>>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
>>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>>>    +- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
>>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
>>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
>>> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>>>
>>>
>>> Thanks and regards,
>>> William
>>>
>>>
>>> On Sat, Jun 15, 2019 at 1:13 AM William Wong <wi...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Appreciate any expert may help on this strange behavior..
>>>>
>>>> It is interesting that... I implemented a custom rule to remove empty
>>>> LocalRelation children under Union and run the same query. The filter 'id =
>>>> 'a' is inferred to the table2 and pushed via the Join.
>>>>
>>>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
>>>> WHERE t1.id = t2.id AND t1.id = 'a'").explain
>>>> == Physical Plan ==
>>>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>>>> :- Union
>>>> :  :- *(1) Project [id#0, val#1]
>>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>>> ReadSchema: struct<id:string,val:string>
>>>> :  +- *(2) Project [id#0, val#1]
>>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>>> = a))
>>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>> true]))
>>>>    +- *(3) Project [id#4, val#5]
>>>>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>>>>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
>>>> ReadSchema: struct<id:string,val:string>
>>>>
>>>> scala>
>>>>
>>>> Thanks and regards,
>>>> William
>>>>
>>>>
>>>>
>>>> On Sat, Jun 15, 2019 at 12:13 AM William Wong <wi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Dear all,
>>>>>
>>>>> I created two tables.
>>>>>
>>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>>>>> string) USING PARQUET");
>>>>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>>>>> metastore. hive.metastore.schema.verification is not enabled so recording
>>>>> the schema version 1.2.0
>>>>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>>>>> returning NoSuchObjectException
>>>>> res1: org.apache.spark.sql.DataFrame = []
>>>>>
>>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>>>>> string) USING PARQUET");
>>>>> res2: org.apache.spark.sql.DataFrame = []
>>>>>
>>>>>
>>>>> It is the plan of joining these two column via ID column. It looks
>>>>> good to me as the filter 'id ='a'' is pushed to both tables as expected.
>>>>>
>>>>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id =
>>>>> t2.id AND t1.id ='a'").explain
>>>>> == Physical Plan ==
>>>>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>>>>> :- *(2) Project [id#23, val#24]
>>>>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>>>>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>>>>> struct<id:string,val:string>
>>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>>> true]))
>>>>>    +- *(1) Project [id#68, val#69]
>>>>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>>>>          +- *(1) FileScan parquet default.table2[id#68,val#69]
>>>>> Batched: true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>>>>> struct<id:string,val:string>
>>>>>
>>>>>
>>>>> Somehow, we created a view on table1 by union a few partitions like
>>>>> this:
>>>>>
>>>>> scala> spark.sql("""
>>>>>      | CREATE VIEW partitioned_table_1 AS
>>>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>>>      | UNION ALL
>>>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>>>      | UNION ALL
>>>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>>>      | UNION ALL
>>>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>>>      | """.stripMargin)
>>>>> res7: org.apache.spark.sql.DataFrame = []
>>>>>
>>>>>
>>>>> In theory, selecting data via this view 'partitioned_table_1' should
>>>>> be the same as via the table 'table1'
>>>>>
>>>>> This query also can push the filter 'id IN ('a','b','c','d') to table2
>>>>> as expected.
>>>>>
>>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
>>>>> WHERE t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>>>>> == Physical Plan ==
>>>>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>>> :- Union
>>>>> :  :- *(1) Project [id#0, val#1]
>>>>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN
>>>>> (a,b,c,d))
>>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>>> :  :- *(2) Project [id#0, val#1]
>>>>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN
>>>>> (a,b,c,d))
>>>>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>>> :  :- *(3) Project [id#0, val#1]
>>>>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN
>>>>> (a,b,c,d))
>>>>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>>> :  +- *(4) Project [id#0, val#1]
>>>>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>>>> isnotnull(id#0))
>>>>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>>>>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>>> true]))
>>>>>    +- *(5) Project [id#23, val#24]
>>>>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
>>>>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
>>>>> isnotnull(id#23))
>>>>>          +- *(5) FileScan parquet default.table2[id#23,val#24]
>>>>> Batched: true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>>>>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>>>>> *ReadSchema: struct<id:string,val:string>
>>>>>
>>>>> scala>
>>>>>
>>>>>
>>>>> However, if we change the filter to 'id ='a', something strange
>>>>> happened. The filter 'id = 'a' cannot be pushed via table2...
>>>>>
>>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
>>>>> WHERE t1.id = t2.id AND t1.id = 'a'").explain
>>>>> == Physical Plan ==
>>>>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>>> :- Union
>>>>> :  :- *(1) Project [id#0, val#1]
>>>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>>>> ReadSchema: struct<id:string,val:string>
>>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>>> :  +- *(2) Project [id#0, val#1]
>>>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) &&
>>>>> (id#0 = a))
>>>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>>> true]))
>>>>>    +- *(3) Project [id#23, val#24]
>>>>>       +- *(3) Filter isnotnull(id#23)
>>>>>          +- *(3) FileScan parquet default.table2[id#23,val#24]
>>>>> Batched: true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>>>>> struct<id:string,val:string>
>>>>>
>>>>>
>>>>> Appreciate if anyone has an idea on it. Many thanks.
>>>>>
>>>>> Best regards,
>>>>> William
>>>>>
>>>>

Re: Filter cannot be pushed via a Join

Posted by William Wong <wi...@gmail.com>.
Hi Xiao,

Just report this with JIRA SPARK-28103.

https://issues.apache.org/jira/browse/SPARK-28103

Thanks and Regards,
William

On Wed, 19 Jun 2019 at 1:35 AM, Xiao Li <ga...@gmail.com> wrote:

> Hi, William,
>
> Thanks for reporting it. Could you open a JIRA?
>
> Cheers,
>
> Xiao
>
> William Wong <wi...@gmail.com> 于2019年6月18日周二 上午8:57写道:
>
>> BTW, I noticed a workaround is creating a custom rule to remove 'empty
>> local relation' from a union table. However, I am not 100% sure if it is
>> the right approach.
>>
>> On Tue, Jun 18, 2019 at 11:53 PM William Wong <wi...@gmail.com>
>> wrote:
>>
>>> Dear all,
>>>
>>> I am not sure if it is something expected or not, and should I report it
>>> as a bug.  Basically, the constraints of a union table could be turned
>>> empty if any subtable is turned into an empty local relation. The side
>>> effect is filter cannot be inferred correctly (by
>>> InferFiltersFromConstrains)
>>>
>>> We may reproduce the issue with the following setup:
>>> 1) Prepare two tables:
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
>>> USING PARQUET");
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
>>> USING PARQUET");
>>>
>>> 2) Create a union view on table1.
>>> * spark.sql("""
>>>      | CREATE VIEW partitioned_table_1 AS
>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>      | """.stripMargin)
>>>
>>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
>>> be inferred. We can see that the constraints of the left table are empty.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
>>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter (isnotnull(id#0) && (id#0 = a))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- LocalRelation <empty>, [id#0, val#1]
>>> :  :- LocalRelation <empty>, [id#0, val#1]
>>> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
>>> :     +- Relation[id#0,val#1] parquet
>>> +- Filter isnotnull(id#4)
>>>    +- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id =
>>> 'a'").queryExecution.optimizedPlan.children(0).constraints
>>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>>>
>>> 4) Modified the query to avoid empty local relation. The filter 't2.id
>>> in ('a','b','c','d')' is then inferred properly. The constraints of the
>>> left table are not empty as well.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan
>>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>> isnotnull(id#0))
>>> :     +- Relation[id#0,val#1] parquet
>>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
>>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>>>    +- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
>>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
>>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
>>> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>>>
>>>
>>> Thanks and regards,
>>> William
>>>
>>>
>>> On Sat, Jun 15, 2019 at 1:13 AM William Wong <wi...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Appreciate any expert may help on this strange behavior..
>>>>
>>>> It is interesting that... I implemented a custom rule to remove empty
>>>> LocalRelation children under Union and run the same query. The filter 'id =
>>>> 'a' is inferred to the table2 and pushed via the Join.
>>>>
>>>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
>>>> WHERE t1.id = t2.id AND t1.id = 'a'").explain
>>>> == Physical Plan ==
>>>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>>>> :- Union
>>>> :  :- *(1) Project [id#0, val#1]
>>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>>> ReadSchema: struct<id:string,val:string>
>>>> :  +- *(2) Project [id#0, val#1]
>>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>>> = a))
>>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>> true]))
>>>>    +- *(3) Project [id#4, val#5]
>>>>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>>>>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
>>>> ReadSchema: struct<id:string,val:string>
>>>>
>>>> scala>
>>>>
>>>> Thanks and regards,
>>>> William
>>>>
>>>>
>>>>
>>>> On Sat, Jun 15, 2019 at 12:13 AM William Wong <wi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Dear all,
>>>>>
>>>>> I created two tables.
>>>>>
>>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>>>>> string) USING PARQUET");
>>>>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>>>>> metastore. hive.metastore.schema.verification is not enabled so recording
>>>>> the schema version 1.2.0
>>>>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>>>>> returning NoSuchObjectException
>>>>> res1: org.apache.spark.sql.DataFrame = []
>>>>>
>>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>>>>> string) USING PARQUET");
>>>>> res2: org.apache.spark.sql.DataFrame = []
>>>>>
>>>>>
>>>>> It is the plan of joining these two column via ID column. It looks
>>>>> good to me as the filter 'id ='a'' is pushed to both tables as expected.
>>>>>
>>>>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id =
>>>>> t2.id AND t1.id ='a'").explain
>>>>> == Physical Plan ==
>>>>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>>>>> :- *(2) Project [id#23, val#24]
>>>>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>>>>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>>>>> struct<id:string,val:string>
>>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>>> true]))
>>>>>    +- *(1) Project [id#68, val#69]
>>>>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>>>>          +- *(1) FileScan parquet default.table2[id#68,val#69]
>>>>> Batched: true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>>>>> struct<id:string,val:string>
>>>>>
>>>>>
>>>>> Somehow, we created a view on table1 by union a few partitions like
>>>>> this:
>>>>>
>>>>> scala> spark.sql("""
>>>>>      | CREATE VIEW partitioned_table_1 AS
>>>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>>>      | UNION ALL
>>>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>>>      | UNION ALL
>>>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>>>      | UNION ALL
>>>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>>>      | """.stripMargin)
>>>>> res7: org.apache.spark.sql.DataFrame = []
>>>>>
>>>>>
>>>>> In theory, selecting data via this view 'partitioned_table_1' should
>>>>> be the same as via the table 'table1'
>>>>>
>>>>> This query also can push the filter 'id IN ('a','b','c','d') to table2
>>>>> as expected.
>>>>>
>>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
>>>>> WHERE t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>>>>> == Physical Plan ==
>>>>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>>> :- Union
>>>>> :  :- *(1) Project [id#0, val#1]
>>>>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN
>>>>> (a,b,c,d))
>>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>>> :  :- *(2) Project [id#0, val#1]
>>>>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN
>>>>> (a,b,c,d))
>>>>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>>> :  :- *(3) Project [id#0, val#1]
>>>>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN
>>>>> (a,b,c,d))
>>>>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>>> :  +- *(4) Project [id#0, val#1]
>>>>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>>>> isnotnull(id#0))
>>>>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>>>>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>>> true]))
>>>>>    +- *(5) Project [id#23, val#24]
>>>>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
>>>>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
>>>>> isnotnull(id#23))
>>>>>          +- *(5) FileScan parquet default.table2[id#23,val#24]
>>>>> Batched: true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>>>>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>>>>> *ReadSchema: struct<id:string,val:string>
>>>>>
>>>>> scala>
>>>>>
>>>>>
>>>>> However, if we change the filter to 'id ='a', something strange
>>>>> happened. The filter 'id = 'a' cannot be pushed via table2...
>>>>>
>>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
>>>>> WHERE t1.id = t2.id AND t1.id = 'a'").explain
>>>>> == Physical Plan ==
>>>>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>>> :- Union
>>>>> :  :- *(1) Project [id#0, val#1]
>>>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>>>> ReadSchema: struct<id:string,val:string>
>>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>>> :  +- *(2) Project [id#0, val#1]
>>>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) &&
>>>>> (id#0 = a))
>>>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>>> true]))
>>>>>    +- *(3) Project [id#23, val#24]
>>>>>       +- *(3) Filter isnotnull(id#23)
>>>>>          +- *(3) FileScan parquet default.table2[id#23,val#24]
>>>>> Batched: true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>>>>> struct<id:string,val:string>
>>>>>
>>>>>
>>>>> Appreciate if anyone has an idea on it. Many thanks.
>>>>>
>>>>> Best regards,
>>>>> William
>>>>>
>>>>

Re: Filter cannot be pushed via a Join

Posted by Xiao Li <ga...@gmail.com>.
Hi, William,

Thanks for reporting it. Could you open a JIRA?

Cheers,

Xiao

William Wong <wi...@gmail.com> 于2019年6月18日周二 上午8:57写道:

> BTW, I noticed a workaround is creating a custom rule to remove 'empty
> local relation' from a union table. However, I am not 100% sure if it is
> the right approach.
>
> On Tue, Jun 18, 2019 at 11:53 PM William Wong <wi...@gmail.com>
> wrote:
>
>> Dear all,
>>
>> I am not sure if it is something expected or not, and should I report it
>> as a bug.  Basically, the constraints of a union table could be turned
>> empty if any subtable is turned into an empty local relation. The side
>> effect is filter cannot be inferred correctly (by
>> InferFiltersFromConstrains)
>>
>> We may reproduce the issue with the following setup:
>> 1) Prepare two tables:
>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
>> USING PARQUET");
>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
>> USING PARQUET");
>>
>> 2) Create a union view on table1.
>> * spark.sql("""
>>      | CREATE VIEW partitioned_table_1 AS
>>      | SELECT * FROM table1 WHERE id = 'a'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'b'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'c'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>      | """.stripMargin)
>>
>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
>> be inferred. We can see that the constraints of the left table are empty.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>> Join Inner, (id#0 = id#4)
>> :- Union
>> :  :- Filter (isnotnull(id#0) && (id#0 = a))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- LocalRelation <empty>, [id#0, val#1]
>> :  :- LocalRelation <empty>, [id#0, val#1]
>> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
>> :     +- Relation[id#0,val#1] parquet
>> +- Filter isnotnull(id#4)
>>    +- Relation[id#4,val#5] parquet
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id =
>> 'a'").queryExecution.optimizedPlan.children(0).constraints
>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>>
>> 4) Modified the query to avoid empty local relation. The filter 't2.id
>> in ('a','b','c','d')' is then inferred properly. The constraints of the
>> left table are not empty as well.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN
>> ('a','b','c','d')").queryExecution.optimizedPlan
>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>> Join Inner, (id#0 = id#4)
>> :- Union
>> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>> isnotnull(id#0))
>> :     +- Relation[id#0,val#1] parquet
>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>>    +- Relation[id#4,val#5] parquet
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN
>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
>> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>>
>>
>> Thanks and regards,
>> William
>>
>>
>> On Sat, Jun 15, 2019 at 1:13 AM William Wong <wi...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> Appreciate any expert may help on this strange behavior..
>>>
>>> It is interesting that... I implemented a custom rule to remove empty
>>> LocalRelation children under Union and run the same query. The filter 'id =
>>> 'a' is inferred to the table2 and pushed via the Join.
>>>
>>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").explain
>>> == Physical Plan ==
>>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>>> :- Union
>>> :  :- *(1) Project [id#0, val#1]
>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>> ReadSchema: struct<id:string,val:string>
>>> :  +- *(2) Project [id#0, val#1]
>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>> = a))
>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>> true]))
>>>    +- *(3) Project [id#4, val#5]
>>>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>>>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
>>> ReadSchema: struct<id:string,val:string>
>>>
>>> scala>
>>>
>>> Thanks and regards,
>>> William
>>>
>>>
>>>
>>> On Sat, Jun 15, 2019 at 12:13 AM William Wong <wi...@gmail.com>
>>> wrote:
>>>
>>>> Dear all,
>>>>
>>>> I created two tables.
>>>>
>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>>>> string) USING PARQUET");
>>>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>>>> metastore. hive.metastore.schema.verification is not enabled so recording
>>>> the schema version 1.2.0
>>>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>>>> returning NoSuchObjectException
>>>> res1: org.apache.spark.sql.DataFrame = []
>>>>
>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>>>> string) USING PARQUET");
>>>> res2: org.apache.spark.sql.DataFrame = []
>>>>
>>>>
>>>> It is the plan of joining these two column via ID column. It looks good
>>>> to me as the filter 'id ='a'' is pushed to both tables as expected.
>>>>
>>>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id =
>>>> t2.id AND t1.id ='a'").explain
>>>> == Physical Plan ==
>>>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>>>> :- *(2) Project [id#23, val#24]
>>>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>>>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>>>> struct<id:string,val:string>
>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>> true]))
>>>>    +- *(1) Project [id#68, val#69]
>>>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>>>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>>>> struct<id:string,val:string>
>>>>
>>>>
>>>> Somehow, we created a view on table1 by union a few partitions like
>>>> this:
>>>>
>>>> scala> spark.sql("""
>>>>      | CREATE VIEW partitioned_table_1 AS
>>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>>      | UNION ALL
>>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>>      | UNION ALL
>>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>>      | UNION ALL
>>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>>      | """.stripMargin)
>>>> res7: org.apache.spark.sql.DataFrame = []
>>>>
>>>>
>>>> In theory, selecting data via this view 'partitioned_table_1' should be
>>>> the same as via the table 'table1'
>>>>
>>>> This query also can push the filter 'id IN ('a','b','c','d') to table2
>>>> as expected.
>>>>
>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>>> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>>>> == Physical Plan ==
>>>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>> :- Union
>>>> :  :- *(1) Project [id#0, val#1]
>>>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN
>>>> (a,b,c,d))
>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>> :  :- *(2) Project [id#0, val#1]
>>>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN
>>>> (a,b,c,d))
>>>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>> :  :- *(3) Project [id#0, val#1]
>>>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN
>>>> (a,b,c,d))
>>>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>> :  +- *(4) Project [id#0, val#1]
>>>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>>> isnotnull(id#0))
>>>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>>>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>> true]))
>>>>    +- *(5) Project [id#23, val#24]
>>>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
>>>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
>>>> isnotnull(id#23))
>>>>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>>>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>>>> *ReadSchema: struct<id:string,val:string>
>>>>
>>>> scala>
>>>>
>>>>
>>>> However, if we change the filter to 'id ='a', something strange
>>>> happened. The filter 'id = 'a' cannot be pushed via table2...
>>>>
>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>>> t1.id = t2.id AND t1.id = 'a'").explain
>>>> == Physical Plan ==
>>>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>> :- Union
>>>> :  :- *(1) Project [id#0, val#1]
>>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>>> ReadSchema: struct<id:string,val:string>
>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>> :  +- *(2) Project [id#0, val#1]
>>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>>> = a))
>>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>> true]))
>>>>    +- *(3) Project [id#23, val#24]
>>>>       +- *(3) Filter isnotnull(id#23)
>>>>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>>>> struct<id:string,val:string>
>>>>
>>>>
>>>> Appreciate if anyone has an idea on it. Many thanks.
>>>>
>>>> Best regards,
>>>> William
>>>>
>>>

Re: Filter cannot be pushed via a Join

Posted by Xiao Li <ga...@gmail.com>.
Hi, William,

Thanks for reporting it. Could you open a JIRA?

Cheers,

Xiao

William Wong <wi...@gmail.com> 于2019年6月18日周二 上午8:57写道:

> BTW, I noticed a workaround is creating a custom rule to remove 'empty
> local relation' from a union table. However, I am not 100% sure if it is
> the right approach.
>
> On Tue, Jun 18, 2019 at 11:53 PM William Wong <wi...@gmail.com>
> wrote:
>
>> Dear all,
>>
>> I am not sure if it is something expected or not, and should I report it
>> as a bug.  Basically, the constraints of a union table could be turned
>> empty if any subtable is turned into an empty local relation. The side
>> effect is filter cannot be inferred correctly (by
>> InferFiltersFromConstrains)
>>
>> We may reproduce the issue with the following setup:
>> 1) Prepare two tables:
>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
>> USING PARQUET");
>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
>> USING PARQUET");
>>
>> 2) Create a union view on table1.
>> * spark.sql("""
>>      | CREATE VIEW partitioned_table_1 AS
>>      | SELECT * FROM table1 WHERE id = 'a'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'b'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'c'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>      | """.stripMargin)
>>
>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
>> be inferred. We can see that the constraints of the left table are empty.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>> Join Inner, (id#0 = id#4)
>> :- Union
>> :  :- Filter (isnotnull(id#0) && (id#0 = a))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- LocalRelation <empty>, [id#0, val#1]
>> :  :- LocalRelation <empty>, [id#0, val#1]
>> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
>> :     +- Relation[id#0,val#1] parquet
>> +- Filter isnotnull(id#4)
>>    +- Relation[id#4,val#5] parquet
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id =
>> 'a'").queryExecution.optimizedPlan.children(0).constraints
>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>>
>> 4) Modified the query to avoid empty local relation. The filter 't2.id
>> in ('a','b','c','d')' is then inferred properly. The constraints of the
>> left table are not empty as well.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN
>> ('a','b','c','d')").queryExecution.optimizedPlan
>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>> Join Inner, (id#0 = id#4)
>> :- Union
>> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>> isnotnull(id#0))
>> :     +- Relation[id#0,val#1] parquet
>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>>    +- Relation[id#4,val#5] parquet
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN
>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
>> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>>
>>
>> Thanks and regards,
>> William
>>
>>
>> On Sat, Jun 15, 2019 at 1:13 AM William Wong <wi...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> Appreciate any expert may help on this strange behavior..
>>>
>>> It is interesting that... I implemented a custom rule to remove empty
>>> LocalRelation children under Union and run the same query. The filter 'id =
>>> 'a' is inferred to the table2 and pushed via the Join.
>>>
>>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").explain
>>> == Physical Plan ==
>>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>>> :- Union
>>> :  :- *(1) Project [id#0, val#1]
>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>> ReadSchema: struct<id:string,val:string>
>>> :  +- *(2) Project [id#0, val#1]
>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>> = a))
>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>> true]))
>>>    +- *(3) Project [id#4, val#5]
>>>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>>>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
>>> ReadSchema: struct<id:string,val:string>
>>>
>>> scala>
>>>
>>> Thanks and regards,
>>> William
>>>
>>>
>>>
>>> On Sat, Jun 15, 2019 at 12:13 AM William Wong <wi...@gmail.com>
>>> wrote:
>>>
>>>> Dear all,
>>>>
>>>> I created two tables.
>>>>
>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>>>> string) USING PARQUET");
>>>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>>>> metastore. hive.metastore.schema.verification is not enabled so recording
>>>> the schema version 1.2.0
>>>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>>>> returning NoSuchObjectException
>>>> res1: org.apache.spark.sql.DataFrame = []
>>>>
>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>>>> string) USING PARQUET");
>>>> res2: org.apache.spark.sql.DataFrame = []
>>>>
>>>>
>>>> It is the plan of joining these two column via ID column. It looks good
>>>> to me as the filter 'id ='a'' is pushed to both tables as expected.
>>>>
>>>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id =
>>>> t2.id AND t1.id ='a'").explain
>>>> == Physical Plan ==
>>>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>>>> :- *(2) Project [id#23, val#24]
>>>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>>>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>>>> struct<id:string,val:string>
>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>> true]))
>>>>    +- *(1) Project [id#68, val#69]
>>>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>>>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>>>> struct<id:string,val:string>
>>>>
>>>>
>>>> Somehow, we created a view on table1 by union a few partitions like
>>>> this:
>>>>
>>>> scala> spark.sql("""
>>>>      | CREATE VIEW partitioned_table_1 AS
>>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>>      | UNION ALL
>>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>>      | UNION ALL
>>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>>      | UNION ALL
>>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>>      | """.stripMargin)
>>>> res7: org.apache.spark.sql.DataFrame = []
>>>>
>>>>
>>>> In theory, selecting data via this view 'partitioned_table_1' should be
>>>> the same as via the table 'table1'
>>>>
>>>> This query also can push the filter 'id IN ('a','b','c','d') to table2
>>>> as expected.
>>>>
>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>>> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>>>> == Physical Plan ==
>>>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>> :- Union
>>>> :  :- *(1) Project [id#0, val#1]
>>>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN
>>>> (a,b,c,d))
>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>> :  :- *(2) Project [id#0, val#1]
>>>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN
>>>> (a,b,c,d))
>>>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>> :  :- *(3) Project [id#0, val#1]
>>>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN
>>>> (a,b,c,d))
>>>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>> :  +- *(4) Project [id#0, val#1]
>>>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>>> isnotnull(id#0))
>>>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>>>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>> true]))
>>>>    +- *(5) Project [id#23, val#24]
>>>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
>>>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
>>>> isnotnull(id#23))
>>>>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>>>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>>>> *ReadSchema: struct<id:string,val:string>
>>>>
>>>> scala>
>>>>
>>>>
>>>> However, if we change the filter to 'id ='a', something strange
>>>> happened. The filter 'id = 'a' cannot be pushed via table2...
>>>>
>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>>> t1.id = t2.id AND t1.id = 'a'").explain
>>>> == Physical Plan ==
>>>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>> :- Union
>>>> :  :- *(1) Project [id#0, val#1]
>>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>>> ReadSchema: struct<id:string,val:string>
>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>> :  +- *(2) Project [id#0, val#1]
>>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>>> = a))
>>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>> true]))
>>>>    +- *(3) Project [id#23, val#24]
>>>>       +- *(3) Filter isnotnull(id#23)
>>>>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>>>> struct<id:string,val:string>
>>>>
>>>>
>>>> Appreciate if anyone has an idea on it. Many thanks.
>>>>
>>>> Best regards,
>>>> William
>>>>
>>>

Re: Filter cannot be pushed via a Join

Posted by William Wong <wi...@gmail.com>.
BTW, I noticed a workaround is creating a custom rule to remove 'empty
local relation' from a union table. However, I am not 100% sure if it is
the right approach.

On Tue, Jun 18, 2019 at 11:53 PM William Wong <wi...@gmail.com> wrote:

> Dear all,
>
> I am not sure if it is something expected or not, and should I report it
> as a bug.  Basically, the constraints of a union table could be turned
> empty if any subtable is turned into an empty local relation. The side
> effect is filter cannot be inferred correctly (by
> InferFiltersFromConstrains)
>
> We may reproduce the issue with the following setup:
> 1) Prepare two tables:
> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
> USING PARQUET");
> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
> USING PARQUET");
>
> 2) Create a union view on table1.
> * spark.sql("""
>      | CREATE VIEW partitioned_table_1 AS
>      | SELECT * FROM table1 WHERE id = 'a'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'b'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'c'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>      | """.stripMargin)
>
> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
> be inferred. We can see that the constraints of the left table are empty.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter (isnotnull(id#0) && (id#0 = a))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- LocalRelation <empty>, [id#0, val#1]
> :  :- LocalRelation <empty>, [id#0, val#1]
> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
> :     +- Relation[id#0,val#1] parquet
> +- Filter isnotnull(id#4)
>    +- Relation[id#4,val#5] parquet
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id =
> 'a'").queryExecution.optimizedPlan.children(0).constraints
> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>
> 4) Modified the query to avoid empty local relation. The filter 't2.id in
> ('a','b','c','d')' is then inferred properly. The constraints of the left
> table are not empty as well.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN
> ('a','b','c','d')").queryExecution.optimizedPlan
> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
> isnotnull(id#0))
> :     +- Relation[id#0,val#1] parquet
> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>    +- Relation[id#4,val#5] parquet
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN
> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
> Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>
>
> Thanks and regards,
> William
>
>
> On Sat, Jun 15, 2019 at 1:13 AM William Wong <wi...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> Appreciate any expert may help on this strange behavior..
>>
>> It is interesting that... I implemented a custom rule to remove empty
>> LocalRelation children under Union and run the same query. The filter 'id =
>> 'a' is inferred to the table2 and pushed via the Join.
>>
>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").explain
>> == Physical Plan ==
>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>> ReadSchema: struct<id:string,val:string>
>> :  +- *(2) Project [id#0, val#1]
>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
>> a))
>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(3) Project [id#4, val#5]
>>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
>> ReadSchema: struct<id:string,val:string>
>>
>> scala>
>>
>> Thanks and regards,
>> William
>>
>>
>>
>> On Sat, Jun 15, 2019 at 12:13 AM William Wong <wi...@gmail.com>
>> wrote:
>>
>>> Dear all,
>>>
>>> I created two tables.
>>>
>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>>> string) USING PARQUET");
>>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>>> metastore. hive.metastore.schema.verification is not enabled so recording
>>> the schema version 1.2.0
>>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>>> returning NoSuchObjectException
>>> res1: org.apache.spark.sql.DataFrame = []
>>>
>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>>> string) USING PARQUET");
>>> res2: org.apache.spark.sql.DataFrame = []
>>>
>>>
>>> It is the plan of joining these two column via ID column. It looks good
>>> to me as the filter 'id ='a'' is pushed to both tables as expected.
>>>
>>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
>>> AND t1.id ='a'").explain
>>> == Physical Plan ==
>>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>>> :- *(2) Project [id#23, val#24]
>>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>>> struct<id:string,val:string>
>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>> true]))
>>>    +- *(1) Project [id#68, val#69]
>>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>>> struct<id:string,val:string>
>>>
>>>
>>> Somehow, we created a view on table1 by union a few partitions like this:
>>>
>>> scala> spark.sql("""
>>>      | CREATE VIEW partitioned_table_1 AS
>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>      | """.stripMargin)
>>> res7: org.apache.spark.sql.DataFrame = []
>>>
>>>
>>> In theory, selecting data via this view 'partitioned_table_1' should be
>>> the same as via the table 'table1'
>>>
>>> This query also can push the filter 'id IN ('a','b','c','d') to table2
>>> as expected.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>>> == Physical Plan ==
>>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>> :- Union
>>> :  :- *(1) Project [id#0, val#1]
>>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN
>>> (a,b,c,d))
>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>> :  :- *(2) Project [id#0, val#1]
>>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN
>>> (a,b,c,d))
>>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>> :  :- *(3) Project [id#0, val#1]
>>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN
>>> (a,b,c,d))
>>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>> :  +- *(4) Project [id#0, val#1]
>>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>> isnotnull(id#0))
>>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>> true]))
>>>    +- *(5) Project [id#23, val#24]
>>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
>>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
>>> isnotnull(id#23))
>>>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>>> *ReadSchema: struct<id:string,val:string>
>>>
>>> scala>
>>>
>>>
>>> However, if we change the filter to 'id ='a', something strange
>>> happened. The filter 'id = 'a' cannot be pushed via table2...
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").explain
>>> == Physical Plan ==
>>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>> :- Union
>>> :  :- *(1) Project [id#0, val#1]
>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>> ReadSchema: struct<id:string,val:string>
>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>> :  +- *(2) Project [id#0, val#1]
>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>> = a))
>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>> true]))
>>>    +- *(3) Project [id#23, val#24]
>>>       +- *(3) Filter isnotnull(id#23)
>>>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>>> struct<id:string,val:string>
>>>
>>>
>>> Appreciate if anyone has an idea on it. Many thanks.
>>>
>>> Best regards,
>>> William
>>>
>>

Re: Filter cannot be pushed via a Join

Posted by William Wong <wi...@gmail.com>.
BTW, I noticed a workaround is creating a custom rule to remove 'empty
local relation' from a union table. However, I am not 100% sure if it is
the right approach.

On Tue, Jun 18, 2019 at 11:53 PM William Wong <wi...@gmail.com> wrote:

> Dear all,
>
> I am not sure if it is something expected or not, and should I report it
> as a bug.  Basically, the constraints of a union table could be turned
> empty if any subtable is turned into an empty local relation. The side
> effect is filter cannot be inferred correctly (by
> InferFiltersFromConstrains)
>
> We may reproduce the issue with the following setup:
> 1) Prepare two tables:
> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
> USING PARQUET");
> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
> USING PARQUET");
>
> 2) Create a union view on table1.
> * spark.sql("""
>      | CREATE VIEW partitioned_table_1 AS
>      | SELECT * FROM table1 WHERE id = 'a'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'b'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'c'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>      | """.stripMargin)
>
> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
> be inferred. We can see that the constraints of the left table are empty.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter (isnotnull(id#0) && (id#0 = a))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- LocalRelation <empty>, [id#0, val#1]
> :  :- LocalRelation <empty>, [id#0, val#1]
> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
> :     +- Relation[id#0,val#1] parquet
> +- Filter isnotnull(id#4)
>    +- Relation[id#4,val#5] parquet
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id =
> 'a'").queryExecution.optimizedPlan.children(0).constraints
> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>
> 4) Modified the query to avoid empty local relation. The filter 't2.id in
> ('a','b','c','d')' is then inferred properly. The constraints of the left
> table are not empty as well.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN
> ('a','b','c','d')").queryExecution.optimizedPlan
> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
> isnotnull(id#0))
> :     +- Relation[id#0,val#1] parquet
> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>    +- Relation[id#4,val#5] parquet
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN
> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
> Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>
>
> Thanks and regards,
> William
>
>
> On Sat, Jun 15, 2019 at 1:13 AM William Wong <wi...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> Appreciate any expert may help on this strange behavior..
>>
>> It is interesting that... I implemented a custom rule to remove empty
>> LocalRelation children under Union and run the same query. The filter 'id =
>> 'a' is inferred to the table2 and pushed via the Join.
>>
>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").explain
>> == Physical Plan ==
>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>> ReadSchema: struct<id:string,val:string>
>> :  +- *(2) Project [id#0, val#1]
>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
>> a))
>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(3) Project [id#4, val#5]
>>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
>> ReadSchema: struct<id:string,val:string>
>>
>> scala>
>>
>> Thanks and regards,
>> William
>>
>>
>>
>> On Sat, Jun 15, 2019 at 12:13 AM William Wong <wi...@gmail.com>
>> wrote:
>>
>>> Dear all,
>>>
>>> I created two tables.
>>>
>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>>> string) USING PARQUET");
>>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>>> metastore. hive.metastore.schema.verification is not enabled so recording
>>> the schema version 1.2.0
>>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>>> returning NoSuchObjectException
>>> res1: org.apache.spark.sql.DataFrame = []
>>>
>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>>> string) USING PARQUET");
>>> res2: org.apache.spark.sql.DataFrame = []
>>>
>>>
>>> It is the plan of joining these two column via ID column. It looks good
>>> to me as the filter 'id ='a'' is pushed to both tables as expected.
>>>
>>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
>>> AND t1.id ='a'").explain
>>> == Physical Plan ==
>>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>>> :- *(2) Project [id#23, val#24]
>>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>>> struct<id:string,val:string>
>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>> true]))
>>>    +- *(1) Project [id#68, val#69]
>>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>>> struct<id:string,val:string>
>>>
>>>
>>> Somehow, we created a view on table1 by union a few partitions like this:
>>>
>>> scala> spark.sql("""
>>>      | CREATE VIEW partitioned_table_1 AS
>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>      | """.stripMargin)
>>> res7: org.apache.spark.sql.DataFrame = []
>>>
>>>
>>> In theory, selecting data via this view 'partitioned_table_1' should be
>>> the same as via the table 'table1'
>>>
>>> This query also can push the filter 'id IN ('a','b','c','d') to table2
>>> as expected.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>>> == Physical Plan ==
>>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>> :- Union
>>> :  :- *(1) Project [id#0, val#1]
>>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN
>>> (a,b,c,d))
>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>> :  :- *(2) Project [id#0, val#1]
>>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN
>>> (a,b,c,d))
>>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>> :  :- *(3) Project [id#0, val#1]
>>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN
>>> (a,b,c,d))
>>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>> :  +- *(4) Project [id#0, val#1]
>>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>> isnotnull(id#0))
>>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>> true]))
>>>    +- *(5) Project [id#23, val#24]
>>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
>>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
>>> isnotnull(id#23))
>>>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>>> *ReadSchema: struct<id:string,val:string>
>>>
>>> scala>
>>>
>>>
>>> However, if we change the filter to 'id ='a', something strange
>>> happened. The filter 'id = 'a' cannot be pushed via table2...
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").explain
>>> == Physical Plan ==
>>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>> :- Union
>>> :  :- *(1) Project [id#0, val#1]
>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>> ReadSchema: struct<id:string,val:string>
>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>> :  +- *(2) Project [id#0, val#1]
>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>> = a))
>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>> true]))
>>>    +- *(3) Project [id#23, val#24]
>>>       +- *(3) Filter isnotnull(id#23)
>>>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>>> struct<id:string,val:string>
>>>
>>>
>>> Appreciate if anyone has an idea on it. Many thanks.
>>>
>>> Best regards,
>>> William
>>>
>>

Re: Filter cannot be pushed via a Join

Posted by William Wong <wi...@gmail.com>.
Dear all,

I am not sure if it is something expected or not, and should I report it as
a bug.  Basically, the constraints of a union table could be turned empty
if any subtable is turned into an empty local relation. The side effect is
filter cannot be inferred correctly (by InferFiltersFromConstrains)

We may reproduce the issue with the following setup:
1) Prepare two tables:
* spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING
PARQUET");
* spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING
PARQUET");

2) Create a union view on table1.
* spark.sql("""
     | CREATE VIEW partitioned_table_1 AS
     | SELECT * FROM table1 WHERE id = 'a'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'b'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'c'
     | UNION ALL
     | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
     | """.stripMargin)

3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot be
inferred. We can see that the constraints of the left table are empty.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter (isnotnull(id#0) && (id#0 = a))
:  :  +- Relation[id#0,val#1] parquet
:  :- LocalRelation <empty>, [id#0, val#1]
:  :- LocalRelation <empty>, [id#0, val#1]
:  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
:     +- Relation[id#0,val#1] parquet
+- Filter isnotnull(id#4)
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id =
'a'").queryExecution.optimizedPlan.children(0).constraints
res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()

4) Modified the query to avoid empty local relation. The filter 't2.id in
('a','b','c','d')' is then inferred properly. The constraints of the left
table are not empty as well.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN ('a','b','c','d')").queryExecution.optimizedPlan
res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
:     +- Relation[id#0,val#1] parquet
+- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4
= b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN
('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
(id#0 = c)) || NOT id#0 IN (a,b,c)))


Thanks and regards,
William


On Sat, Jun 15, 2019 at 1:13 AM William Wong <wi...@gmail.com> wrote:

> Hi all,
>
> Appreciate any expert may help on this strange behavior..
>
> It is interesting that... I implemented a custom rule to remove empty
> LocalRelation children under Union and run the same query. The filter 'id =
> 'a' is inferred to the table2 and pushed via the Join.
>
> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").explain
> == Physical Plan ==
> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
> ReadSchema: struct<id:string,val:string>
> :  +- *(2) Project [id#0, val#1]
> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
> a))
> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(3) Project [id#4, val#5]
>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
> ReadSchema: struct<id:string,val:string>
>
> scala>
>
> Thanks and regards,
> William
>
>
>
> On Sat, Jun 15, 2019 at 12:13 AM William Wong <wi...@gmail.com>
> wrote:
>
>> Dear all,
>>
>> I created two tables.
>>
>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>> string) USING PARQUET");
>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>> metastore. hive.metastore.schema.verification is not enabled so recording
>> the schema version 1.2.0
>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>> returning NoSuchObjectException
>> res1: org.apache.spark.sql.DataFrame = []
>>
>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>> string) USING PARQUET");
>> res2: org.apache.spark.sql.DataFrame = []
>>
>>
>> It is the plan of joining these two column via ID column. It looks good
>> to me as the filter 'id ='a'' is pushed to both tables as expected.
>>
>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
>> AND t1.id ='a'").explain
>> == Physical Plan ==
>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>> :- *(2) Project [id#23, val#24]
>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>> struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(1) Project [id#68, val#69]
>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>> struct<id:string,val:string>
>>
>>
>> Somehow, we created a view on table1 by union a few partitions like this:
>>
>> scala> spark.sql("""
>>      | CREATE VIEW partitioned_table_1 AS
>>      | SELECT * FROM table1 WHERE id = 'a'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'b'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'c'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>      | """.stripMargin)
>> res7: org.apache.spark.sql.DataFrame = []
>>
>>
>> In theory, selecting data via this view 'partitioned_table_1' should be
>> the same as via the table 'table1'
>>
>> This query also can push the filter 'id IN ('a','b','c','d') to table2 as
>> expected.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>> == Physical Plan ==
>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN
>> (a,b,c,d))
>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>> :  :- *(2) Project [id#0, val#1]
>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN
>> (a,b,c,d))
>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>> :  :- *(3) Project [id#0, val#1]
>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN
>> (a,b,c,d))
>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>> :  +- *(4) Project [id#0, val#1]
>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>> isnotnull(id#0))
>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(5) Project [id#23, val#24]
>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
>> isnotnull(id#23))
>>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>> *ReadSchema: struct<id:string,val:string>
>>
>> scala>
>>
>>
>> However, if we change the filter to 'id ='a', something strange happened.
>> The filter 'id = 'a' cannot be pushed via table2...
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").explain
>> == Physical Plan ==
>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>> ReadSchema: struct<id:string,val:string>
>> :  :- LocalTableScan <empty>, [id#0, val#1]
>> :  :- LocalTableScan <empty>, [id#0, val#1]
>> :  +- *(2) Project [id#0, val#1]
>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
>> a))
>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(3) Project [id#23, val#24]
>>       +- *(3) Filter isnotnull(id#23)
>>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>> struct<id:string,val:string>
>>
>>
>> Appreciate if anyone has an idea on it. Many thanks.
>>
>> Best regards,
>> William
>>
>

Re: Filter cannot be pushed via a Join

Posted by William Wong <wi...@gmail.com>.
Dear all,

I am not sure if it is something expected or not, and should I report it as
a bug.  Basically, the constraints of a union table could be turned empty
if any subtable is turned into an empty local relation. The side effect is
filter cannot be inferred correctly (by InferFiltersFromConstrains)

We may reproduce the issue with the following setup:
1) Prepare two tables:
* spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING
PARQUET");
* spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING
PARQUET");

2) Create a union view on table1.
* spark.sql("""
     | CREATE VIEW partitioned_table_1 AS
     | SELECT * FROM table1 WHERE id = 'a'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'b'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'c'
     | UNION ALL
     | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
     | """.stripMargin)

3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot be
inferred. We can see that the constraints of the left table are empty.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter (isnotnull(id#0) && (id#0 = a))
:  :  +- Relation[id#0,val#1] parquet
:  :- LocalRelation <empty>, [id#0, val#1]
:  :- LocalRelation <empty>, [id#0, val#1]
:  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
:     +- Relation[id#0,val#1] parquet
+- Filter isnotnull(id#4)
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id =
'a'").queryExecution.optimizedPlan.children(0).constraints
res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()

4) Modified the query to avoid empty local relation. The filter 't2.id in
('a','b','c','d')' is then inferred properly. The constraints of the left
table are not empty as well.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN ('a','b','c','d')").queryExecution.optimizedPlan
res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
:     +- Relation[id#0,val#1] parquet
+- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4
= b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN
('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
(id#0 = c)) || NOT id#0 IN (a,b,c)))


Thanks and regards,
William


On Sat, Jun 15, 2019 at 1:13 AM William Wong <wi...@gmail.com> wrote:

> Hi all,
>
> Appreciate any expert may help on this strange behavior..
>
> It is interesting that... I implemented a custom rule to remove empty
> LocalRelation children under Union and run the same query. The filter 'id =
> 'a' is inferred to the table2 and pushed via the Join.
>
> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").explain
> == Physical Plan ==
> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
> ReadSchema: struct<id:string,val:string>
> :  +- *(2) Project [id#0, val#1]
> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
> a))
> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(3) Project [id#4, val#5]
>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
> ReadSchema: struct<id:string,val:string>
>
> scala>
>
> Thanks and regards,
> William
>
>
>
> On Sat, Jun 15, 2019 at 12:13 AM William Wong <wi...@gmail.com>
> wrote:
>
>> Dear all,
>>
>> I created two tables.
>>
>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>> string) USING PARQUET");
>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>> metastore. hive.metastore.schema.verification is not enabled so recording
>> the schema version 1.2.0
>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>> returning NoSuchObjectException
>> res1: org.apache.spark.sql.DataFrame = []
>>
>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>> string) USING PARQUET");
>> res2: org.apache.spark.sql.DataFrame = []
>>
>>
>> It is the plan of joining these two column via ID column. It looks good
>> to me as the filter 'id ='a'' is pushed to both tables as expected.
>>
>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
>> AND t1.id ='a'").explain
>> == Physical Plan ==
>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>> :- *(2) Project [id#23, val#24]
>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>> struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(1) Project [id#68, val#69]
>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>> struct<id:string,val:string>
>>
>>
>> Somehow, we created a view on table1 by union a few partitions like this:
>>
>> scala> spark.sql("""
>>      | CREATE VIEW partitioned_table_1 AS
>>      | SELECT * FROM table1 WHERE id = 'a'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'b'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'c'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>      | """.stripMargin)
>> res7: org.apache.spark.sql.DataFrame = []
>>
>>
>> In theory, selecting data via this view 'partitioned_table_1' should be
>> the same as via the table 'table1'
>>
>> This query also can push the filter 'id IN ('a','b','c','d') to table2 as
>> expected.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>> == Physical Plan ==
>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN
>> (a,b,c,d))
>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>> :  :- *(2) Project [id#0, val#1]
>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN
>> (a,b,c,d))
>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>> :  :- *(3) Project [id#0, val#1]
>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN
>> (a,b,c,d))
>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>> :  +- *(4) Project [id#0, val#1]
>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>> isnotnull(id#0))
>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(5) Project [id#23, val#24]
>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
>> isnotnull(id#23))
>>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>> *ReadSchema: struct<id:string,val:string>
>>
>> scala>
>>
>>
>> However, if we change the filter to 'id ='a', something strange happened.
>> The filter 'id = 'a' cannot be pushed via table2...
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").explain
>> == Physical Plan ==
>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>> ReadSchema: struct<id:string,val:string>
>> :  :- LocalTableScan <empty>, [id#0, val#1]
>> :  :- LocalTableScan <empty>, [id#0, val#1]
>> :  +- *(2) Project [id#0, val#1]
>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
>> a))
>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(3) Project [id#23, val#24]
>>       +- *(3) Filter isnotnull(id#23)
>>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>> struct<id:string,val:string>
>>
>>
>> Appreciate if anyone has an idea on it. Many thanks.
>>
>> Best regards,
>> William
>>
>

Re: Filter cannot be pushed via a Join

Posted by William Wong <wi...@gmail.com>.
Hi all,

Appreciate any expert may help on this strange behavior..

It is interesting that... I implemented a custom rule to remove empty
LocalRelation children under Union and run the same query. The filter 'id =
'a' is inferred to the table2 and pushed via the Join.

scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").explain
== Physical Plan ==
*(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
:  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
ReadSchema: struct<id:string,val:string>
:  +- *(2) Project [id#0, val#1]
:     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
a))
:        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(3) Project [id#4, val#5]
      +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
         +- *(3) FileScan parquet default.table2[id#4,val#5] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
ReadSchema: struct<id:string,val:string>

scala>

Thanks and regards,
William



On Sat, Jun 15, 2019 at 12:13 AM William Wong <wi...@gmail.com> wrote:

> Dear all,
>
> I created two tables.
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
> USING PARQUET");
> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording
> the schema version 1.2.0
> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
> returning NoSuchObjectException
> res1: org.apache.spark.sql.DataFrame = []
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
> USING PARQUET");
> res2: org.apache.spark.sql.DataFrame = []
>
>
> It is the plan of joining these two column via ID column. It looks good to
> me as the filter 'id ='a'' is pushed to both tables as expected.
>
> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
> AND t1.id ='a'").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
> :- *(2) Project [id#23, val#24]
> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true,
> Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
> struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(1) Project [id#68, val#69]
>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
> struct<id:string,val:string>
>
>
> Somehow, we created a view on table1 by union a few partitions like this:
>
> scala> spark.sql("""
>      | CREATE VIEW partitioned_table_1 AS
>      | SELECT * FROM table1 WHERE id = 'a'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'b'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'c'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>      | """.stripMargin)
> res7: org.apache.spark.sql.DataFrame = []
>
>
> In theory, selecting data via this view 'partitioned_table_1' should be
> the same as via the table 'table1'
>
> This query also can push the filter 'id IN ('a','b','c','d') to table2 as
> expected.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
> == Physical Plan ==
> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
> :  :- *(2) Project [id#0, val#1]
> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
> :  :- *(3) Project [id#0, val#1]
> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
> :  +- *(4) Project [id#0, val#1]
> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
> isnotnull(id#0))
> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(5) Project [id#23, val#24]
>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
> isnotnull(id#23))
>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
> *ReadSchema: struct<id:string,val:string>
>
> scala>
>
>
> However, if we change the filter to 'id ='a', something strange happened.
> The filter 'id = 'a' cannot be pushed via table2...
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").explain
> == Physical Plan ==
> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
> ReadSchema: struct<id:string,val:string>
> :  :- LocalTableScan <empty>, [id#0, val#1]
> :  :- LocalTableScan <empty>, [id#0, val#1]
> :  +- *(2) Project [id#0, val#1]
> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
> a))
> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(3) Project [id#23, val#24]
>       +- *(3) Filter isnotnull(id#23)
>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
> struct<id:string,val:string>
>
>
> Appreciate if anyone has an idea on it. Many thanks.
>
> Best regards,
> William
>

Re: Filter cannot be pushed via a Join

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,
Can I ask which is the version of SPARK you are using? And in what
environment?

Regards,
Gourav

On Fri, Jun 14, 2019 at 5:14 PM William Wong <wi...@gmail.com> wrote:

> Dear all,
>
> I created two tables.
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
> USING PARQUET");
> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording
> the schema version 1.2.0
> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
> returning NoSuchObjectException
> res1: org.apache.spark.sql.DataFrame = []
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
> USING PARQUET");
> res2: org.apache.spark.sql.DataFrame = []
>
>
> It is the plan of joining these two column via ID column. It looks good to
> me as the filter 'id ='a'' is pushed to both tables as expected.
>
> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
> AND t1.id ='a'").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
> :- *(2) Project [id#23, val#24]
> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true,
> Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
> struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(1) Project [id#68, val#69]
>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
> struct<id:string,val:string>
>
>
> Somehow, we created a view on table1 by union a few partitions like this:
>
> scala> spark.sql("""
>      | CREATE VIEW partitioned_table_1 AS
>      | SELECT * FROM table1 WHERE id = 'a'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'b'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'c'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>      | """.stripMargin)
> res7: org.apache.spark.sql.DataFrame = []
>
>
> In theory, selecting data via this view 'partitioned_table_1' should be
> the same as via the table 'table1'
>
> This query also can push the filter 'id IN ('a','b','c','d') to table2 as
> expected.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
> == Physical Plan ==
> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
> :  :- *(2) Project [id#0, val#1]
> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
> :  :- *(3) Project [id#0, val#1]
> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
> :  +- *(4) Project [id#0, val#1]
> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
> isnotnull(id#0))
> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(5) Project [id#23, val#24]
>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
> isnotnull(id#23))
>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
> *ReadSchema: struct<id:string,val:string>
>
> scala>
>
>
> However, if we change the filter to 'id ='a', something strange happened.
> The filter 'id = 'a' cannot be pushed via table2...
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").explain
> == Physical Plan ==
> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
> ReadSchema: struct<id:string,val:string>
> :  :- LocalTableScan <empty>, [id#0, val#1]
> :  :- LocalTableScan <empty>, [id#0, val#1]
> :  +- *(2) Project [id#0, val#1]
> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
> a))
> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(3) Project [id#23, val#24]
>       +- *(3) Filter isnotnull(id#23)
>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
> struct<id:string,val:string>
>
>
> Appreciate if anyone has an idea on it. Many thanks.
>
> Best regards,
> William
>