You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/08 20:45:10 UTC

spark git commit: [SPARK-12727][SQL] support SQL generation for aggregate with multi-distinct

Repository: spark
Updated Branches:
  refs/heads/master ad3c9a973 -> 46881b4ea


[SPARK-12727][SQL] support SQL generation for aggregate with multi-distinct

## What changes were proposed in this pull request?

This PR add SQL generation support for aggregate with multi-distinct, by simply moving the `DistinctAggregationRewriter` rule to optimizer.

More discussions are needed as this breaks an import contract: analyzed plan should be able to run without optimization.  However, the `ComputeCurrentTime` rule has kind of broken it already, and I think maybe we should add a new phase for this kind of rules, because strictly speaking they don't belong to analysis and is coupled with the physical plan implementation.

## How was this patch tested?

existing tests

Author: Wenchen Fan <we...@databricks.com>

Closes #11579 from cloud-fan/distinct.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46881b4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46881b4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46881b4e

Branch: refs/heads/master
Commit: 46881b4ea229aecbb481626d8b9fbca24c0df075
Parents: ad3c9a9
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Mar 8 11:45:08 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Mar 8 11:45:08 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala    | 1 -
 .../sql/catalyst/analysis/DistinctAggregationRewriter.scala  | 8 ++------
 .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala  | 5 +++--
 .../org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala    | 5 +----
 4 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/46881b4e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 268d7f2..9ab0a20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -91,7 +91,6 @@ class Analyzer(
       ExtractWindowExpressions ::
       GlobalAggregates ::
       ResolveAggregateFunctions ::
-      DistinctAggregationRewriter(conf) ::
       HiveTypeCoercion.typeCoercionRules ++
       extendedResolutionRules : _*),
     Batch("Nondeterministic", Once,

http://git-wip-us.apache.org/repos/asf/spark/blob/46881b4e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
index 7518946..38c1641 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Complete}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPlan}
@@ -100,13 +99,10 @@ import org.apache.spark.sql.types.IntegerType
  * we could improve this in the current rule by applying more advanced expression cannocalization
  * techniques.
  */
-case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] {
+object DistinctAggregationRewriter extends Rule[LogicalPlan] {
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case p if !p.resolved => p
-    // We need to wait until this Aggregate operator is resolved.
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case a: Aggregate => rewrite(a)
-    case p => p
   }
 
   def rewrite(a: Aggregate): Aggregate = {

http://git-wip-us.apache.org/repos/asf/spark/blob/46881b4e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index deea723..7455e68 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import scala.annotation.tailrec
 import scala.collection.immutable.HashSet
 
-import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubqueryAliases}
+import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
@@ -42,7 +42,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
     // we do not eliminate subqueries or compute current time in the analyzer.
     Batch("Finish Analysis", Once,
       EliminateSubqueryAliases,
-      ComputeCurrentTime) ::
+      ComputeCurrentTime,
+      DistinctAggregationRewriter) ::
     //////////////////////////////////////////////////////////////////////////////////////////
     // Optimizer rules start here
     //////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/46881b4e/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index ed85856..ccce987 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -139,7 +139,6 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
       """.stripMargin)
   }
 
-
   test("intersect") {
     checkHiveQl("SELECT * FROM t0 INTERSECT SELECT * FROM t0")
   }
@@ -367,9 +366,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
     checkHiveQl("SELECT * FROM parquet_t0 TABLESAMPLE(0.1 PERCENT) WHERE 1=0")
   }
 
-  // TODO Enable this
-  // Query plans transformed by DistinctAggregationRewriter are not recognized yet
-  ignore("multi-distinct columns") {
+  test("multi-distinct columns") {
     checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM parquet_t2 GROUP BY a")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org