You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/01/04 11:46:33 UTC

spark git commit: [SPARK-19060][SQL] remove the supportsPartial flag in AggregateFunction

Repository: spark
Updated Branches:
  refs/heads/master fe1c895e1 -> 101556d0f


[SPARK-19060][SQL] remove the supportsPartial flag in AggregateFunction

## What changes were proposed in this pull request?

Now all aggregation functions support partial aggregate, we can remove the `supportsPartual` flag in `AggregateFunction`

## How was this patch tested?

existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #16461 from cloud-fan/partial.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/101556d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/101556d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/101556d0

Branch: refs/heads/master
Commit: 101556d0fa704deca0f4a2e5070906d4af2c861b
Parents: fe1c895
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Jan 4 12:46:30 2017 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Wed Jan 4 12:46:30 2017 +0100

----------------------------------------------------------------------
 .../expressions/aggregate/interfaces.scala      |  6 ------
 .../expressions/windowExpressions.scala         |  1 -
 .../optimizer/RewriteDistinctAggregates.scala   |  7 ++-----
 .../spark/sql/execution/SparkStrategies.scala   | 13 +------------
 .../sql/execution/aggregate/AggUtils.scala      | 20 --------------------
 .../org/apache/spark/sql/hive/hiveUDFs.scala    |  2 --
 .../sql/hive/execution/TestingTypedCount.scala  |  2 --
 7 files changed, 3 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index ccd4ae6..80c25d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -174,12 +174,6 @@ abstract class AggregateFunction extends Expression {
   def inputAggBufferAttributes: Seq[AttributeReference]
 
   /**
-   * Indicates if this function supports partial aggregation.
-   * Currently Hive UDAF is the only one that doesn't support partial aggregation.
-   */
-  def supportsPartial: Boolean = true
-
-  /**
    * Result of the aggregate function when the input is empty. This is currently only used for the
    * proper rewriting of distinct aggregate functions.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index c0d6a6b..13115f4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -436,7 +436,6 @@ abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowF
   override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
   override def dataType: DataType = IntegerType
   override def nullable: Boolean = true
-  override def supportsPartial: Boolean = false
   override lazy val mergeExpressions =
     throw new UnsupportedOperationException("Window Functions do not support merging.")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
index cd8912f..3b27cd2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
@@ -131,11 +131,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
         }
     }
 
-    // Check if the aggregates contains functions that do not support partial aggregation.
-    val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial)
-
-    // Aggregation strategy can handle queries with a single distinct group and partial aggregates.
-    if (distinctAggGroups.size > 1 || (distinctAggGroups.size == 1 && existsNonPartial)) {
+    // Aggregation strategy can handle queries with a single distinct group.
+    if (distinctAggGroups.size > 1) {
       // Create the attributes for the grouping id and the group by clause.
       val gid = AttributeReference("gid", IntegerType, nullable = false)(isGenerated = true)
       val groupByMap = a.groupingExpressions.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 81cd5ef..28808f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -262,18 +262,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         }
 
         val aggregateOperator =
-          if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
-            if (functionsWithDistinct.nonEmpty) {
-              sys.error("Distinct columns cannot exist in Aggregate operator containing " +
-                "aggregate functions which don't support partial aggregation.")
-            } else {
-              aggregate.AggUtils.planAggregateWithoutPartial(
-                groupingExpressions,
-                aggregateExpressions,
-                resultExpressions,
-                planLater(child))
-            }
-          } else if (functionsWithDistinct.isEmpty) {
+          if (functionsWithDistinct.isEmpty) {
             aggregate.AggUtils.planAggregateWithoutDistinct(
               groupingExpressions,
               aggregateExpressions,

http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
index 8b8ccf4..aa789af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
@@ -27,26 +27,6 @@ import org.apache.spark.sql.internal.SQLConf
  * Utility functions used by the query planner to convert our plan to new aggregation code path.
  */
 object AggUtils {
-
-  def planAggregateWithoutPartial(
-      groupingExpressions: Seq[NamedExpression],
-      aggregateExpressions: Seq[AggregateExpression],
-      resultExpressions: Seq[NamedExpression],
-      child: SparkPlan): Seq[SparkPlan] = {
-
-    val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
-    val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute)
-    SortAggregateExec(
-      requiredChildDistributionExpressions = Some(groupingExpressions),
-      groupingExpressions = groupingExpressions,
-      aggregateExpressions = completeAggregateExpressions,
-      aggregateAttributes = completeAggregateAttributes,
-      initialInputBufferOffset = 0,
-      resultExpressions = resultExpressions,
-      child = child
-    ) :: Nil
-  }
-
   private def createAggregate(
       requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
       groupingExpressions: Seq[NamedExpression] = Nil,

http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index fcefd69..4590197 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -380,8 +380,6 @@ private[hive] case class HiveUDAFFunction(
 
   override def nullable: Boolean = true
 
-  override def supportsPartial: Boolean = true
-
   override lazy val dataType: DataType = inspectorToDataType(returnInspector)
 
   override def prettyName: String = name

http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala
index aaf1db6..31b2430 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala
@@ -42,8 +42,6 @@ case class TestingTypedCount(
 
   override def nullable: Boolean = false
 
-  override val supportsPartial: Boolean = true
-
   override def createAggregationBuffer(): State = TestingTypedCount.State(0L)
 
   override def update(buffer: State, input: InternalRow): State = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org