You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2019/06/19 16:40:00 UTC

[jira] [Assigned] (SPARK-28103) Cannot infer filters from union table with empty local relation table properly

     [ https://issues.apache.org/jira/browse/SPARK-28103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-28103:
------------------------------------

    Assignee: Apache Spark

> Cannot infer filters from union table with empty local relation table properly
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-28103
>                 URL: https://issues.apache.org/jira/browse/SPARK-28103
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.2, 2.4.1
>            Reporter: William Wong
>            Assignee: Apache Spark
>            Priority: Major
>
> Basically, the constraints of a union table could be turned empty if any subtable is turned into an empty local relation. The side effect is filter cannot be inferred correctly (by InferFiltersFromConstrains) 
>  
>  We may reproduce the issue with the following setup:
> 1) Prepare two tables: 
>  
> {code:java}
> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING PARQUET");
> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING PARQUET");{code}
>  
> 2) Create a union view on table1. 
> {code:java}
> spark.sql("""
>      | CREATE VIEW partitioned_table_1 AS
>      | SELECT * FROM table1 WHERE id = 'a'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'b'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'c'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>      | """.stripMargin){code}
>  
>  3) View the optimized plan of this SQL. The filter '[t2.id = 'a']' cannot be inferred. We can see that the constraints of the left table are empty. 
> {code:java}
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = 'a'").queryExecution.optimizedPlan
> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter (isnotnull(id#0) && (id#0 = a))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- LocalRelation <empty>, [id#0, val#1]
> :  :- LocalRelation <empty>, [id#0, val#1]
> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
> :     +- Relation[id#0,val#1] parquet
> +- Filter isnotnull(id#4)
>    +- Relation[id#4,val#5] parquet
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = 'a'").queryExecution.optimizedPlan.children(0).constraints
> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>  
> {code}
>  
> 4) Modified the query to avoid empty local relation. The filter '[td.id in ('a','b','c','d')' is then inferred properly. The constraints of the left table are not empty as well. 
> {code:java}
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN ('a','b','c','d')").queryExecution.optimizedPlan
> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
> :     +- Relation[id#0,val#1] parquet
> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>    +- Relation[id#4,val#5] parquet
>  
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) || (id#0 = c)) || NOT id#0 IN (a,b,c)))
> {code}
>  
> One of the possible workaround is create a rule to remove all empty local relation from a union table. Or, when we convert a relation to into an empty local relation, we should preserve those constraints in the empty local relation as well. 
>  
> A side node. Expression in optimized plan is not well optimized. For example, the expression 
> {code:java}
> ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4)){code}
> could be further optimized into 
> {code:java}
> (isnotnull(id#4) && (id = d)){code}
> We may implement following rules to simplify the expression. 
> 1) convert all 'equal' operators into 'in' operator, and then group all 'in' and 'not in' expressions by 'attribute reference' 
>     i) eq(a,val) => in(a,val::Nil)
> 2) merge all those 'in' and 'not in' operators, like
>     i)  or(in(a,list1),in(a,list2)) => in(a, list1 ++ list2)
>     ii) or(in(a,list1), not(in(a,list2)) => not(in(a, list2 -- list1)) 
>    iii) and(in(a,list1),in(a,list2)) => in(a, list1 intersect list2) 
>    vi) and(in(a,list1),not(in(a,list2))) => in(a, list1 – list2) 
> 3) revert in operator into 'equal' if there is only one element in the list. 
>   i) in(a,list) if list.size == 1 => eq(a,list.head) 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org