You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2016/06/30 22:22:10 UTC

[jira] [Updated] (SPARK-16208) Add `PropagateEmptyRelation` optimizer

     [ https://issues.apache.org/jira/browse/SPARK-16208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-16208:
----------------------------------
       Priority: Major  (was: Minor)
    Description: 
This issue adds a new logical optimizer, `PropagateEmptyRelation`, to collapse a logical plans consisting of only empty LocalRelations.

**Optimizer Targets**

1. Binary(or Higher)-node Logical Plans
   - Union with all empty children.
   - Join with one or two empty children (including Intersect/Except).
2. Unary-node Logical Plans
   - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
   - Aggregate with all empty children and without AggregateFunction expressions, COUNT.
   - Generate with Explode because other UserDefinedGenerators like Hive UDTF returns results.

**Sample Query**
{code}
WITH t1 AS (SELECT a FROM VALUES 1 t(a)),
     t2 AS (SELECT b FROM VALUES 1 t(b) WHERE 1=2)
SELECT a,b
FROM t1, t2
WHERE a=b
GROUP BY a,b
HAVING a>1
ORDER BY a,b
{code}

**Before**
{code}
scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain
== Physical Plan ==
*Sort [a#0 ASC, b#1 ASC], true, 0
+- Exchange rangepartitioning(a#0 ASC, b#1 ASC, 200)
   +- *HashAggregate(keys=[a#0, b#1], functions=[])
      +- Exchange hashpartitioning(a#0, b#1, 200)
         +- *HashAggregate(keys=[a#0, b#1], functions=[])
            +- *BroadcastHashJoin [a#0], [b#1], Inner, BuildRight
               :- *Filter (isnotnull(a#0) && (a#0 > 1))
               :  +- LocalTableScan [a#0]
               +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                  +- *Filter (isnotnull(b#1) && (b#1 > 1))
                     +- LocalTableScan <empty>, [b#1]
{code}

**After**
{code}
scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain
== Physical Plan ==
LocalTableScan <empty>, [a#0, b#1]
{code}

  was:
This PR adds a new logical optimizer, `CollapseEmptyPlan`, to collapse a logical plans consisting of only empty LocalRelations. The only exceptional logical plan is aggregation. For aggregation plan, only simple cases are consider for this optimization.

**Before**
{code}
scala> sql("select a from values (1,2) T(a,b) where 1=0 group by a,b having a>1 order by a,b").explain
== Physical Plan ==
*Project [a#11]
+- *Sort [a#11 ASC, b#12 ASC], true, 0
   +- Exchange rangepartitioning(a#11 ASC, b#12 ASC, 200)
      +- *HashAggregate(keys=[a#11, b#12], functions=[])
         +- Exchange hashpartitioning(a#11, b#12, 200)
            +- *HashAggregate(keys=[a#11, b#12], functions=[])
               +- LocalTableScan <empty>, [a#11, b#12]
{code}

**After**
{code}
scala> sql("select a from values (1,2) T(a,b) where 1=0 group by a,b having a>1 order by a,b").explain
== Physical Plan ==
LocalTableScan <empty>, [a#0]
{code}

        Summary: Add `PropagateEmptyRelation` optimizer  (was: Add `CollapseEmptyPlan` optimizer)

> Add `PropagateEmptyRelation` optimizer
> --------------------------------------
>
>                 Key: SPARK-16208
>                 URL: https://issues.apache.org/jira/browse/SPARK-16208
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Dongjoon Hyun
>            Assignee: Apache Spark
>
> This issue adds a new logical optimizer, `PropagateEmptyRelation`, to collapse a logical plans consisting of only empty LocalRelations.
> **Optimizer Targets**
> 1. Binary(or Higher)-node Logical Plans
>    - Union with all empty children.
>    - Join with one or two empty children (including Intersect/Except).
> 2. Unary-node Logical Plans
>    - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
>    - Aggregate with all empty children and without AggregateFunction expressions, COUNT.
>    - Generate with Explode because other UserDefinedGenerators like Hive UDTF returns results.
> **Sample Query**
> {code}
> WITH t1 AS (SELECT a FROM VALUES 1 t(a)),
>      t2 AS (SELECT b FROM VALUES 1 t(b) WHERE 1=2)
> SELECT a,b
> FROM t1, t2
> WHERE a=b
> GROUP BY a,b
> HAVING a>1
> ORDER BY a,b
> {code}
> **Before**
> {code}
> scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain
> == Physical Plan ==
> *Sort [a#0 ASC, b#1 ASC], true, 0
> +- Exchange rangepartitioning(a#0 ASC, b#1 ASC, 200)
>    +- *HashAggregate(keys=[a#0, b#1], functions=[])
>       +- Exchange hashpartitioning(a#0, b#1, 200)
>          +- *HashAggregate(keys=[a#0, b#1], functions=[])
>             +- *BroadcastHashJoin [a#0], [b#1], Inner, BuildRight
>                :- *Filter (isnotnull(a#0) && (a#0 > 1))
>                :  +- LocalTableScan [a#0]
>                +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
>                   +- *Filter (isnotnull(b#1) && (b#1 > 1))
>                      +- LocalTableScan <empty>, [b#1]
> {code}
> **After**
> {code}
> scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain
> == Physical Plan ==
> LocalTableScan <empty>, [a#0, b#1]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org