You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "William Wong (JIRA)" <ji...@apache.org> on 2019/06/19 03:45:00 UTC

[jira] [Updated] (SPARK-28103) Cannot infer filters from union table with empty local relation table properly

     [ https://issues.apache.org/jira/browse/SPARK-28103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

William Wong updated SPARK-28103:
---------------------------------
    Description: 
Basically, the constraints of a union table could be turned empty if any subtable is turned into an empty local relation. The side effect is filter cannot be inferred correctly (by InferFiltersFromConstrains) 

 
 We may reproduce the issue with the following setup:

1) Prepare two tables: 

 
{code:java}
spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING PARQUET");
spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING PARQUET");{code}
 

2) Create a union view on table1. 
{code:java}
spark.sql("""
     | CREATE VIEW partitioned_table_1 AS
     | SELECT * FROM table1 WHERE id = 'a'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'b'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'c'
     | UNION ALL
     | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
     | """.stripMargin){code}
 

 3) View the optimized plan of this SQL. The filter '[t2.id = 'a']' cannot be inferred. We can see that the constraints of the left table are empty. 
{code:java}
scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = 'a'").queryExecution.optimizedPlan

res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter (isnotnull(id#0) && (id#0 = a))
:  :  +- Relation[id#0,val#1] parquet
:  :- LocalRelation <empty>, [id#0, val#1]
:  :- LocalRelation <empty>, [id#0, val#1]
:  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
:     +- Relation[id#0,val#1] parquet
+- Filter isnotnull(id#4)
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = 'a'").queryExecution.optimizedPlan.children(0).constraints
res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
 
{code}
 

4) Modified the query to avoid empty local relation. The filter '[td.id in ('a','b','c','d')' is then inferred properly. The constraints of the left table are not empty as well. 
{code:java}
scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN ('a','b','c','d')").queryExecution.optimizedPlan
res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
:     +- Relation[id#0,val#1] parquet
+- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
   +- Relation[id#4,val#5] parquet
 
scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) || (id#0 = c)) || NOT id#0 IN (a,b,c)))
{code}
 

One of the possible workaround is create a rule to remove all empty local relation from a union table. Or, when we convert a relation to into an empty local relation, we should preserve those constraints in the empty local relation as well. 

 

A side node. Expression in optimized plan is not well optimized. For example, the expression 
{code:java}
((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4)){code}
could be further optimized into 
{code:java}
(isnotnull(id#4) && (id = d)){code}
We may implement another rule to 

1) convert all 'equal' operators into 'in' operator, and then group all expressions by 'attribute reference' 

3) merge all those 'in' (or not in) operators 

4) revert in operator into 'equal' if there is only one element in the set. 

 

  was:
Basically, the constraints of a union table could be turned empty if any subtable is turned into an empty local relation. The side effect is filter cannot be inferred correctly (by InferFiltersFromConstrains) 

 
We may reproduce the issue with the following setup:

1) Prepare two tables: 

 
{code:java}
spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING PARQUET");
spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING PARQUET");{code}
 

2) Create a union view on table1. 
{code:java}
spark.sql("""
     | CREATE VIEW partitioned_table_1 AS
     | SELECT * FROM table1 WHERE id = 'a'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'b'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'c'
     | UNION ALL
     | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
     | """.stripMargin){code}
 

 3) View the optimized plan of this SQL. The filter '[t2.id [t2.id]|https://urldefense.proofpoint.com/v2/url?u=http-3A__t2.id&d=DwMFaQ&c=lxzXOFU02467FL7HOPRqCw&r=QLWkn-MIQZ6wM0VKRZSxipwIbmB7fKk9_zd1_axi-XQ&m=ezb9buJE3VsOytBu2oydJfvIfdTmVHPIGwaagdYSG98&s=L-aQUAtCG1PufnRe0Hy0adnmxqny1GitX8OJV9zq2oI&e=] = 'a'' cannot be inferred. We can see that the constraints of the left table are empty.

 
{code:java}
 
scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = 'a'").queryExecution.optimizedPlan

res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter (isnotnull(id#0) && (id#0 = a))
:  :  +- Relation[id#0,val#1] parquet
:  :- LocalRelation <empty>, [id#0, val#1]
:  :- LocalRelation <empty>, [id#0, val#1]
:  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
:     +- Relation[id#0,val#1] parquet
+- Filter isnotnull(id#4)
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = 'a'").queryExecution.optimizedPlan.children(0).constraints
res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
 
{code}
 

4) Modified the query to avoid empty local relation. The filter '[t2.id [t2.id]|https://urldefense.proofpoint.com/v2/url?u=http-3A__t2.id&d=DwMFaQ&c=lxzXOFU02467FL7HOPRqCw&r=QLWkn-MIQZ6wM0VKRZSxipwIbmB7fKk9_zd1_axi-XQ&m=ezb9buJE3VsOytBu2oydJfvIfdTmVHPIGwaagdYSG98&s=L-aQUAtCG1PufnRe0Hy0adnmxqny1GitX8OJV9zq2oI&e=] in ('a','b','c','d')' is then inferred properly. The constraints of the left table are not empty as well.

 

 
{code:java}
scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN ('a','b','c','d')").queryExecution.optimizedPlan
res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
:     +- Relation[id#0,val#1] parquet
+- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
   +- Relation[id#4,val#5] parquet
 
scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) || (id#0 = c)) || NOT id#0 IN (a,b,c)))
{code}
 

 

One of the possible workaround is create a rule to remove all empty local relation from a union table. Or, when we convert a relation to into an empty local relation, we should preserve those constraints in the empty local relation as well. 

 

A side node. Expression in optimized plan is not well optimized. For example, the expression 
{code:java}
((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4)){code}
could be further optimized into 
{code:java}
(isnotnull(id#4) && (id = d)){code}
We may implement another rule to 

1) convert all 'equal' operators into 'in' operator, and then group all expressions by 'attribute reference' 

3) merge all those 'in' (or not in) operators 

4) revert in operator into 'equal' if there is only one element in the set. 

 


> Cannot infer filters from union table with empty local relation table properly
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-28103
>                 URL: https://issues.apache.org/jira/browse/SPARK-28103
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.2, 2.4.1
>            Reporter: William Wong
>            Priority: Major
>
> Basically, the constraints of a union table could be turned empty if any subtable is turned into an empty local relation. The side effect is filter cannot be inferred correctly (by InferFiltersFromConstrains) 
>  
>  We may reproduce the issue with the following setup:
> 1) Prepare two tables: 
>  
> {code:java}
> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING PARQUET");
> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING PARQUET");{code}
>  
> 2) Create a union view on table1. 
> {code:java}
> spark.sql("""
>      | CREATE VIEW partitioned_table_1 AS
>      | SELECT * FROM table1 WHERE id = 'a'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'b'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'c'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>      | """.stripMargin){code}
>  
>  3) View the optimized plan of this SQL. The filter '[t2.id = 'a']' cannot be inferred. We can see that the constraints of the left table are empty. 
> {code:java}
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = 'a'").queryExecution.optimizedPlan
> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter (isnotnull(id#0) && (id#0 = a))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- LocalRelation <empty>, [id#0, val#1]
> :  :- LocalRelation <empty>, [id#0, val#1]
> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
> :     +- Relation[id#0,val#1] parquet
> +- Filter isnotnull(id#4)
>    +- Relation[id#4,val#5] parquet
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = 'a'").queryExecution.optimizedPlan.children(0).constraints
> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>  
> {code}
>  
> 4) Modified the query to avoid empty local relation. The filter '[td.id in ('a','b','c','d')' is then inferred properly. The constraints of the left table are not empty as well. 
> {code:java}
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN ('a','b','c','d')").queryExecution.optimizedPlan
> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
> :     +- Relation[id#0,val#1] parquet
> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>    +- Relation[id#4,val#5] parquet
>  
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) || (id#0 = c)) || NOT id#0 IN (a,b,c)))
> {code}
>  
> One of the possible workaround is create a rule to remove all empty local relation from a union table. Or, when we convert a relation to into an empty local relation, we should preserve those constraints in the empty local relation as well. 
>  
> A side node. Expression in optimized plan is not well optimized. For example, the expression 
> {code:java}
> ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4)){code}
> could be further optimized into 
> {code:java}
> (isnotnull(id#4) && (id = d)){code}
> We may implement another rule to 
> 1) convert all 'equal' operators into 'in' operator, and then group all expressions by 'attribute reference' 
> 3) merge all those 'in' (or not in) operators 
> 4) revert in operator into 'equal' if there is only one element in the set. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org