You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/05 12:20:06 UTC

[spark] branch master updated: [SPARK-37212][SQL] Improve the implement of aggregate pushdown

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d563623  [SPARK-37212][SQL] Improve the implement of aggregate pushdown
d563623 is described below

commit d56362392e22ecdf0c0d01c7b233e20fb6e65607
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Nov 5 20:19:12 2021 +0800

    [SPARK-37212][SQL] Improve the implement of aggregate pushdown
    
    ### What changes were proposed in this pull request?
    Spark SQL supported aggregate pushdown for JDBC. When I reading the current implement, I find some little issue.
    
    ### Why are the changes needed?
    Make variable and comment more correctly and Simplify the code.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    Jenkins test.
    
    Closes #34488 from beliefer/SPARK-37212.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/datasources/DataSourceStrategy.scala | 12 ++++++------
 .../execution/datasources/v2/PushDownUtils.scala   | 22 +++++++++-------------
 .../datasources/v2/V2ScanRelationPushDown.scala    |  4 ++--
 3 files changed, 17 insertions(+), 21 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2325597..6febbd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -702,23 +702,23 @@ object DataSourceStrategy
     (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
   }
 
-  protected[sql] def translateAggregate(aggregates: AggregateExpression): Option[AggregateFunc] = {
-    if (aggregates.filter.isEmpty) {
-      aggregates.aggregateFunction match {
+  protected[sql] def translateAggregate(agg: AggregateExpression): Option[AggregateFunc] = {
+    if (agg.filter.isEmpty) {
+      agg.aggregateFunction match {
         case aggregate.Min(PushableColumnWithoutNestedColumn(name)) =>
           Some(new Min(FieldReference(name)))
         case aggregate.Max(PushableColumnWithoutNestedColumn(name)) =>
           Some(new Max(FieldReference(name)))
         case count: aggregate.Count if count.children.length == 1 =>
           count.children.head match {
-            // SELECT COUNT(*) FROM table is translated to SELECT 1 FROM table
+            // COUNT(any literal) is the same as COUNT(*)
             case Literal(_, _) => Some(new CountStar())
             case PushableColumnWithoutNestedColumn(name) =>
-              Some(new Count(FieldReference(name), aggregates.isDistinct))
+              Some(new Count(FieldReference(name), agg.isDistinct))
             case _ => None
           }
         case sum @ aggregate.Sum(PushableColumnWithoutNestedColumn(name), _) =>
-          Some(new Sum(FieldReference(name), aggregates.isDistinct))
+          Some(new Sum(FieldReference(name), agg.isDistinct))
         case _ => None
       }
     } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index f837ab5..d51b475 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -112,7 +112,7 @@ object PushDownUtils extends PredicateHelper {
    * @return pushed aggregation.
    */
   def pushAggregates(
-      scanBuilder: ScanBuilder,
+      scanBuilder: SupportsPushDownAggregates,
       aggregates: Seq[AggregateExpression],
       groupBy: Seq[Expression]): Option[Aggregation] = {
 
@@ -122,20 +122,16 @@ object PushDownUtils extends PredicateHelper {
       case _ => None
     }
 
-    scanBuilder match {
-      case r: SupportsPushDownAggregates if aggregates.nonEmpty =>
-        val translatedAggregates = aggregates.flatMap(DataSourceStrategy.translateAggregate)
-        val translatedGroupBys = groupBy.flatMap(columnAsString)
-
-        if (translatedAggregates.length != aggregates.length ||
-          translatedGroupBys.length != groupBy.length) {
-          return None
-        }
+    val translatedAggregates = aggregates.flatMap(DataSourceStrategy.translateAggregate)
+    val translatedGroupBys = groupBy.flatMap(columnAsString)
 
-        val agg = new Aggregation(translatedAggregates.toArray, translatedGroupBys.toArray)
-        Some(agg).filter(r.pushAggregation)
-      case _ => None
+    if (translatedAggregates.length != aggregates.length ||
+      translatedGroupBys.length != groupBy.length) {
+      return None
     }
+
+    val agg = new Aggregation(translatedAggregates.toArray, translatedGroupBys.toArray)
+    Some(agg).filter(scanBuilder.pushAggregation)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index f73f831..36923d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -85,7 +85,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
         case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
           if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
           sHolder.builder match {
-            case _: SupportsPushDownAggregates =>
+            case r: SupportsPushDownAggregates =>
               val aggExprToOutputOrdinal = mutable.HashMap.empty[Expression, Int]
               var ordinal = 0
               val aggregates = resultExpressions.flatMap { expr =>
@@ -105,7 +105,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
               val normalizedGroupingExpressions = DataSourceStrategy.normalizeExprs(
                 groupingExpressions, sHolder.relation.output)
               val pushedAggregates = PushDownUtils.pushAggregates(
-                sHolder.builder, normalizedAggregates, normalizedGroupingExpressions)
+                r, normalizedAggregates, normalizedGroupingExpressions)
               if (pushedAggregates.isEmpty) {
                 aggNode // return original plan node
               } else {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org