You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by aray <gi...@git.apache.org> on 2015/11/03 07:21:46 UTC

[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

GitHub user aray opened a pull request:

    https://github.com/apache/spark/pull/9429

    [SPARK-11275][SQL] Reimplement Expand as a Generator and fix existing implementation bugs

    This is an alternative to https://github.com/apache/spark/pull/9419
    
    I got tired of fighting/fixing the bugs with the existing implementation of cube/rollup/grouping sets specifically around the Expand operator so I reimplemented it as a Generator. I think this makes for a cleaner implementation. I also added unit tests that show this implementation solves SPARK-11275.
    
    I look forward to your comments!
    
    cc: @rxin @marmbrus @gatorsmile @rick-ibm @hvanhovell @chenghao-intel @holdenk 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aray/spark SPARK-11275

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9429.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9429
    
----
commit fad28d6126187f88b473fc35692248ad2cb00748
Author: Andrew Ray <ra...@gmail.com>
Date:   2015-11-02T18:10:02Z

    Reimplement Expand as a Generator
    - added unit tests for cube and rollup that actualy check the result
    - fixed bugs present in previous implementation of cube/rollup/groupingsets (SPARK-11275)

commit e4636791da3367ee6fcb371f2fce029f3b2e8a3e
Author: Andrew Ray <ra...@gmail.com>
Date:   2015-11-03T06:06:41Z

    newline at end of generators.scala

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153300549
  
    **[Test build #44901 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44901/consoleFull)** for PR 9429 at commit [`e463679`](https://github.com/apache/spark/commit/e4636791da3367ee6fcb371f2fce029f3b2e8a3e).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153257216
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153300620
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44901/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by aray <gi...@git.apache.org>.
Github user aray commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9429#discussion_r43811041
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -205,45 +205,30 @@ class Analyzer(
             GroupingSets(bitmasks(a), a.groupByExprs, a.child, a.aggregations)
           case x: GroupingSets =>
             val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)()
    -        // We will insert another Projection if the GROUP BY keys contains the
    -        // non-attribute expressions. And the top operators can references those
    -        // expressions by its alias.
    -        // e.g. SELECT key%5 as c1 FROM src GROUP BY key%5 ==>
    -        //      SELECT a as c1 FROM (SELECT key%5 AS a FROM src) GROUP BY a
    -
    -        // find all of the non-attribute expressions in the GROUP BY keys
    -        val nonAttributeGroupByExpressions = new ArrayBuffer[Alias]()
    -
    -        // The pair of (the original GROUP BY key, associated attribute)
    -        val groupByExprPairs = x.groupByExprs.map(_ match {
    -          case e: NamedExpression => (e, e.toAttribute)
    -          case other => {
    -            val alias = Alias(other, other.toString)()
    -            nonAttributeGroupByExpressions += alias // add the non-attributes expression alias
    -            (other, alias.toAttribute)
    -          }
    -        })
     
    -        // substitute the non-attribute expressions for aggregations.
    -        val aggregation = x.aggregations.map(expr => expr.transformDown {
    -          case e => groupByExprPairs.find(_._1.semanticEquals(e)).map(_._2).getOrElse(e)
    -        }.asInstanceOf[NamedExpression])
    +        val aliasedGroupByExprPairs = x.groupByExprs.map{
    +          case a @ Alias(expr, _) => (expr, a)
    +          case expr: NamedExpression => (expr, Alias(expr, expr.name)())
    --- End diff --
    
    I believe I need a new Alias here since we really have two versions of the expression -- the original and the version manipulated by the Generator with nulls inserted per the bitmask. In the Aggregate 'aggregation' list the grouping columns need to refer to the manipulated version and 'real' aggregates need to refer to the original version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153540121
  
    After checking with Hive like:
    ```
    hive> select sum(a-b) as ab from mytable group by b with rollup;
    FAILED: SemanticException [Error 10210]: Grouping sets aggregations (with rollups or cubes) are not allowed if aggregation function parameters overlap with the aggregation functions columns
    ```
    Hive actually doesn't support the overlap with the aggregation functions columns. Probably we can have a simple fixing based on the current master branch if we need to support that. 
    
    And after double checking, the master branch will be optimized for expression constant folding while with the `Expand` operator, it means better performance than re-implemented based on UDTF, so I am a little struggling which is the better approach for the implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9429#discussion_r43724216
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -205,45 +205,30 @@ class Analyzer(
             GroupingSets(bitmasks(a), a.groupByExprs, a.child, a.aggregations)
           case x: GroupingSets =>
             val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)()
    -        // We will insert another Projection if the GROUP BY keys contains the
    -        // non-attribute expressions. And the top operators can references those
    -        // expressions by its alias.
    -        // e.g. SELECT key%5 as c1 FROM src GROUP BY key%5 ==>
    -        //      SELECT a as c1 FROM (SELECT key%5 AS a FROM src) GROUP BY a
    -
    -        // find all of the non-attribute expressions in the GROUP BY keys
    -        val nonAttributeGroupByExpressions = new ArrayBuffer[Alias]()
    -
    -        // The pair of (the original GROUP BY key, associated attribute)
    -        val groupByExprPairs = x.groupByExprs.map(_ match {
    -          case e: NamedExpression => (e, e.toAttribute)
    -          case other => {
    -            val alias = Alias(other, other.toString)()
    -            nonAttributeGroupByExpressions += alias // add the non-attributes expression alias
    -            (other, alias.toAttribute)
    -          }
    -        })
     
    -        // substitute the non-attribute expressions for aggregations.
    -        val aggregation = x.aggregations.map(expr => expr.transformDown {
    -          case e => groupByExprPairs.find(_._1.semanticEquals(e)).map(_._2).getOrElse(e)
    -        }.asInstanceOf[NamedExpression])
    +        val aliasedGroupByExprPairs = x.groupByExprs.map{
    +          case a @ Alias(expr, _) => (expr, a)
    +          case expr: NamedExpression => (expr, Alias(expr, expr.name)())
    --- End diff --
    
    You could also use ```expr.toAttribute``` for a ```NamedExpression``` instead of creating an Alias.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9429#discussion_r43721708
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -205,45 +205,30 @@ class Analyzer(
             GroupingSets(bitmasks(a), a.groupByExprs, a.child, a.aggregations)
           case x: GroupingSets =>
             val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)()
    -        // We will insert another Projection if the GROUP BY keys contains the
    -        // non-attribute expressions. And the top operators can references those
    -        // expressions by its alias.
    -        // e.g. SELECT key%5 as c1 FROM src GROUP BY key%5 ==>
    -        //      SELECT a as c1 FROM (SELECT key%5 AS a FROM src) GROUP BY a
    -
    -        // find all of the non-attribute expressions in the GROUP BY keys
    -        val nonAttributeGroupByExpressions = new ArrayBuffer[Alias]()
    -
    -        // The pair of (the original GROUP BY key, associated attribute)
    -        val groupByExprPairs = x.groupByExprs.map(_ match {
    -          case e: NamedExpression => (e, e.toAttribute)
    -          case other => {
    -            val alias = Alias(other, other.toString)()
    -            nonAttributeGroupByExpressions += alias // add the non-attributes expression alias
    -            (other, alias.toAttribute)
    -          }
    -        })
     
    -        // substitute the non-attribute expressions for aggregations.
    -        val aggregation = x.aggregations.map(expr => expr.transformDown {
    -          case e => groupByExprPairs.find(_._1.semanticEquals(e)).map(_._2).getOrElse(e)
    -        }.asInstanceOf[NamedExpression])
    +        val aliasedGroupByExprPairs = x.groupByExprs.map{
    +          case a @ Alias(expr, _) => (expr, a)
    +          case expr: NamedExpression => (expr, Alias(expr, expr.name)())
    +          case expr => (expr, Alias(expr, expr.prettyString)())
    +        }
     
    -        // substitute the group by expressions.
    -        val newGroupByExprs = groupByExprPairs.map(_._2)
    +        val aliasedGroupByExprs = aliasedGroupByExprPairs.map(_._2)
    +        val aliasedGroupByAttr = aliasedGroupByExprs.map(_.toAttribute)
     
    -        val child = if (nonAttributeGroupByExpressions.length > 0) {
    -          // insert additional projection if contains the
    -          // non-attribute expressions in the GROUP BY keys
    -          Project(x.child.output ++ nonAttributeGroupByExpressions, x.child)
    -        } else {
    -          x.child
    +        // substitute group by expressions in aggregation list with appropriate attribute
    +        val aggregations = x.aggregations.map{
    --- End diff --
    
    
    ```scala
    expr => expr.transformDown { 
    ..
    }
    ```
    Otherwise it's not able to substitute the expression like `sum(a+b) + count(c)` for `a+b`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by aray <gi...@git.apache.org>.
Github user aray commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9429#discussion_r43810369
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -205,45 +205,30 @@ class Analyzer(
             GroupingSets(bitmasks(a), a.groupByExprs, a.child, a.aggregations)
           case x: GroupingSets =>
             val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)()
    -        // We will insert another Projection if the GROUP BY keys contains the
    -        // non-attribute expressions. And the top operators can references those
    -        // expressions by its alias.
    -        // e.g. SELECT key%5 as c1 FROM src GROUP BY key%5 ==>
    -        //      SELECT a as c1 FROM (SELECT key%5 AS a FROM src) GROUP BY a
    -
    -        // find all of the non-attribute expressions in the GROUP BY keys
    -        val nonAttributeGroupByExpressions = new ArrayBuffer[Alias]()
    -
    -        // The pair of (the original GROUP BY key, associated attribute)
    -        val groupByExprPairs = x.groupByExprs.map(_ match {
    -          case e: NamedExpression => (e, e.toAttribute)
    -          case other => {
    -            val alias = Alias(other, other.toString)()
    -            nonAttributeGroupByExpressions += alias // add the non-attributes expression alias
    -            (other, alias.toAttribute)
    -          }
    -        })
     
    -        // substitute the non-attribute expressions for aggregations.
    -        val aggregation = x.aggregations.map(expr => expr.transformDown {
    -          case e => groupByExprPairs.find(_._1.semanticEquals(e)).map(_._2).getOrElse(e)
    -        }.asInstanceOf[NamedExpression])
    +        val aliasedGroupByExprPairs = x.groupByExprs.map{
    +          case a @ Alias(expr, _) => (expr, a)
    +          case expr: NamedExpression => (expr, Alias(expr, expr.name)())
    +          case expr => (expr, Alias(expr, expr.prettyString)())
    +        }
     
    -        // substitute the group by expressions.
    -        val newGroupByExprs = groupByExprPairs.map(_._2)
    +        val aliasedGroupByExprs = aliasedGroupByExprPairs.map(_._2)
    +        val aliasedGroupByAttr = aliasedGroupByExprs.map(_.toAttribute)
     
    -        val child = if (nonAttributeGroupByExpressions.length > 0) {
    -          // insert additional projection if contains the
    -          // non-attribute expressions in the GROUP BY keys
    -          Project(x.child.output ++ nonAttributeGroupByExpressions, x.child)
    -        } else {
    -          x.child
    +        // substitute group by expressions in aggregation list with appropriate attribute
    +        val aggregations = x.aggregations.map{
    --- End diff --
    
    @chenghao-intel actually that change would bring back the bug in question since it would do the substitutions in situations like below and the aggregations would be computed off the manipulated (nulls inserted) values.
    ```
    select a + b, c, sum(a+b) + count(c)
    from t1
    group by a + b, c with rollup
    ```
    In general anything below an AggregateExpression we don't want to transform, but above we do. So really I need a transformDownUntil method. BTW making this change does fix the `groupby_grouping_sets1` test so I really do need to do something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153280529
  
    **[Test build #44901 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44901/consoleFull)** for PR 9429 at commit [`e463679`](https://github.com/apache/spark/commit/e4636791da3367ee6fcb371f2fce029f3b2e8a3e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153278771
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153300615
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153276663
  
    And anyone trigger the unit test? @yhuai @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153278676
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153276233
  
    That's cool, I like the idea to re-implement it by introducing a UDTF function, which looks much simpler.
    
    But it breaks the unit test in my local,  as run the unit test like below:
    ```
    build/sbt -Phive -Dspark.hive.whitelist='groupby.*_grouping.*' 'test-only org.apache.spark.sql.hive.execution.HiveCompatibilitySuite'
    ```
    Some unresolved attribute exceptions, I didn't dig it yet, but can you solve that?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11275][SQL] Reimplement Expand as a Gen...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/9429#issuecomment-153277867
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org