You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/05/20 22:39:11 UTC

spark git commit: Revert "[SPARK-7320] [SQL] Add Cube / Rollup for dataframe"

Repository: spark
Updated Branches:
  refs/heads/master 829f1d95b -> 6338c40da


Revert "[SPARK-7320] [SQL] Add Cube / Rollup for dataframe"

This reverts commit 10698e1131f665addb454cd498669920699a91b2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6338c40d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6338c40d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6338c40d

Branch: refs/heads/master
Commit: 6338c40da61de045485c51aa11a5b1e425d22144
Parents: 829f1d9
Author: Patrick Wendell <pa...@databricks.com>
Authored: Wed May 20 13:39:04 2015 -0700
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Wed May 20 13:39:04 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  | 104 +------------------
 .../org/apache/spark/sql/GroupedData.scala      |  92 +++++-----------
 .../sql/hive/HiveDataFrameAnalyticsSuite.scala  |  62 -----------
 3 files changed, 28 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6338c40d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d78b4c2..adad858 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -685,53 +685,7 @@ class DataFrame private[sql](
    * @since 1.3.0
    */
   @scala.annotation.varargs
-  def groupBy(cols: Column*): GroupedData = {
-    GroupedData(this, cols.map(_.expr), GroupedData.GroupByType)
-  }
-
-  /**
-   * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
-   * so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns rolluped by department and group.
-   *   df.rollup($"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, rolluped by department and gender.
-   *   df.rollup($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.4.0
-   */
-  @scala.annotation.varargs
-  def rollup(cols: Column*): GroupedData = {
-    GroupedData(this, cols.map(_.expr), GroupedData.RollupType)
-  }
-
-  /**
-   * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
-   * so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns cubed by department and group.
-   *   df.cube($"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, cubed by department and gender.
-   *   df.cube($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.4.0
-   */
-  @scala.annotation.varargs
-  def cube(cols: Column*): GroupedData = GroupedData(this, cols.map(_.expr), GroupedData.CubeType)
+  def groupBy(cols: Column*): GroupedData = new GroupedData(this, cols.map(_.expr))
 
   /**
    * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
@@ -756,61 +710,7 @@ class DataFrame private[sql](
   @scala.annotation.varargs
   def groupBy(col1: String, cols: String*): GroupedData = {
     val colNames: Seq[String] = col1 +: cols
-    GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.GroupByType)
-  }
-
-  /**
-   * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
-   * so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * This is a variant of rollup that can only group by existing columns using column names
-   * (i.e. cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns rolluped by department and group.
-   *   df.rollup("department", "group").avg()
-   *
-   *   // Compute the max age and average salary, rolluped by department and gender.
-   *   df.rollup($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.4.0
-   */
-  @scala.annotation.varargs
-  def rollup(col1: String, cols: String*): GroupedData = {
-    val colNames: Seq[String] = col1 +: cols
-    GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.RollupType)
-  }
-
-  /**
-   * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
-   * so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * This is a variant of cube that can only group by existing columns using column names
-   * (i.e. cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns cubed by department and group.
-   *   df.cube("department", "group").avg()
-   *
-   *   // Compute the max age and average salary, cubed by department and gender.
-   *   df.cube($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.4.0
-   */
-  @scala.annotation.varargs
-  def cube(col1: String, cols: String*): GroupedData = {
-    val colNames: Seq[String] = col1 +: cols
-    GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.CubeType)
+    new GroupedData(this, colNames.map(colName => resolve(colName)))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6338c40d/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index f730e4a..1381b9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -23,40 +23,9 @@ import scala.language.implicitConversions
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.Star
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Rollup, Cube, Aggregate}
+import org.apache.spark.sql.catalyst.plans.logical.Aggregate
 import org.apache.spark.sql.types.NumericType
 
-/**
- * Companion object for GroupedData
- */
-private[sql] object GroupedData {
-  def apply(
-      df: DataFrame,
-      groupingExprs: Seq[Expression],
-      groupType: GroupType): GroupedData = {
-    new GroupedData(df, groupingExprs, groupType: GroupType)
-  }
-
-  /**
-   * The Grouping Type
-   */
-  trait GroupType
-
-  /**
-   * To indicate it's the GroupBy
-   */
-  object GroupByType extends GroupType
-
-  /**
-   * To indicate it's the CUBE
-   */
-  object CubeType extends GroupType
-
-  /**
-   * To indicate it's the ROLLUP
-   */
-  object RollupType extends GroupType
-}
 
 /**
  * :: Experimental ::
@@ -65,37 +34,19 @@ private[sql] object GroupedData {
  * @since 1.3.0
  */
 @Experimental
-class GroupedData protected[sql](
-    df: DataFrame,
-    groupingExprs: Seq[Expression],
-    private val groupType: GroupedData.GroupType) {
+class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) {
 
-  private[this] def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
-    val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
-        val retainedExprs = groupingExprs.map {
-          case expr: NamedExpression => expr
-          case expr: Expression => Alias(expr, expr.prettyString)()
-        }
-        retainedExprs ++ aggExprs
-      } else {
-        aggExprs
-      }
-
-    groupType match {
-      case GroupedData.GroupByType =>
-        DataFrame(
-          df.sqlContext, Aggregate(groupingExprs, aggregates, df.logicalPlan))
-      case GroupedData.RollupType =>
-        DataFrame(
-          df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aggregates))
-      case GroupedData.CubeType =>
-        DataFrame(
-          df.sqlContext, Cube(groupingExprs, df.logicalPlan, aggregates))
+  private[sql] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
+    val namedGroupingExprs = groupingExprs.map {
+      case expr: NamedExpression => expr
+      case expr: Expression => Alias(expr, expr.prettyString)()
     }
+    DataFrame(
+      df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, df.logicalPlan))
   }
 
   private[this] def aggregateNumericColumns(colNames: String*)(f: Expression => Expression)
-    : DataFrame = {
+    : Seq[NamedExpression] = {
 
     val columnExprs = if (colNames.isEmpty) {
       // No columns specified. Use all numeric columns.
@@ -112,10 +63,10 @@ class GroupedData protected[sql](
         namedExpr
       }
     }
-    toDF(columnExprs.map { c =>
+    columnExprs.map { c =>
       val a = f(c)
       Alias(a, a.prettyString)()
-    })
+    }
   }
 
   private[this] def strToExpr(expr: String): (Expression => Expression) = {
@@ -168,10 +119,10 @@ class GroupedData protected[sql](
    * @since 1.3.0
    */
   def agg(exprs: Map[String, String]): DataFrame = {
-    toDF(exprs.map { case (colName, expr) =>
+    exprs.map { case (colName, expr) =>
       val a = strToExpr(expr)(df(colName).expr)
       Alias(a, a.prettyString)()
-    }.toSeq)
+    }.toSeq
   }
 
   /**
@@ -224,10 +175,19 @@ class GroupedData protected[sql](
    */
   @scala.annotation.varargs
   def agg(expr: Column, exprs: Column*): DataFrame = {
-    toDF((expr +: exprs).map(_.expr).map {
+    val aggExprs = (expr +: exprs).map(_.expr).map {
       case expr: NamedExpression => expr
       case expr: Expression => Alias(expr, expr.prettyString)()
-    })
+    }
+    if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
+      val retainedExprs = groupingExprs.map {
+        case expr: NamedExpression => expr
+        case expr: Expression => Alias(expr, expr.prettyString)()
+      }
+      DataFrame(df.sqlContext, Aggregate(groupingExprs, retainedExprs ++ aggExprs, df.logicalPlan))
+    } else {
+      DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan))
+    }
   }
 
   /**
@@ -236,7 +196,7 @@ class GroupedData protected[sql](
    *
    * @since 1.3.0
    */
-  def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)), "count")()))
+  def count(): DataFrame = Seq(Alias(Count(Literal(1)), "count")())
 
   /**
    * Compute the average value for each numeric columns for each group. This is an alias for `avg`.
@@ -296,5 +256,5 @@ class GroupedData protected[sql](
   @scala.annotation.varargs
   def sum(colNames: String*): DataFrame = {
     aggregateNumericColumns(colNames:_*)(Sum)
-  }
+  }    
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6338c40d/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
deleted file mode 100644
index 3ad05f4..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.hive.test.TestHive.implicits._
-
-case class TestData2Int(a: Int, b: Int)
-
-// TODO ideally we should put the test suite into the package `sql`, as
-// `hive` package is optional in compiling, however, `SQLContext.sql` doesn't
-// support the `cube` or `rollup` yet.
-class HiveDataFrameAnalyticsSuite extends QueryTest {
-  val testData =
-    TestHive.sparkContext.parallelize(
-      TestData2Int(1, 2) ::
-        TestData2Int(2, 4) :: Nil).toDF()
-
-  testData.registerTempTable("mytable")
-
-  test("rollup") {
-    checkAnswer(
-      testData.rollup($"a" + $"b", $"b").agg(sum($"a" - $"b")),
-      sql("select a + b, b, sum(a - b) from mytable group by a + b, b with rollup").collect()
-    )
-
-    checkAnswer(
-      testData.rollup("a", "b").agg(sum("b")),
-      sql("select a, b, sum(b) from mytable group by a, b with rollup").collect()
-    )
-  }
-
-  test("cube") {
-    checkAnswer(
-      testData.cube($"a" + $"b", $"b").agg(sum($"a" - $"b")),
-      sql("select a + b, b, sum(a - b) from mytable group by a + b, b with cube").collect()
-    )
-
-    checkAnswer(
-      testData.cube("a", "b").agg(sum("b")),
-      sql("select a, b, sum(b) from mytable group by a, b with cube").collect()
-    )
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org