You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/14 16:55:32 UTC

[GitHub] [spark] huaxingao commented on a change in pull request #29695: [SPARK-32833][SQL] [WIP]JDBC V2 Datasource aggregate push down

huaxingao commented on a change in pull request #29695:
URL: https://github.com/apache/spark/pull/29695#discussion_r488084262



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
##########
@@ -69,6 +71,37 @@ object PushDownUtils extends PredicateHelper {
     }
   }
 
+    /**
+     * Pushes down aggregates to the data source reader
+     *
+     * @return pushed aggregates and post-scan aggregates.
+     */
+    def pushAggregates(scanBuilder: ScanBuilder, aggregates: Seq[AggregateExpression])
+      : (Seq[sources.AggregateFunction], Seq[AggregateExpression]) = {
+      scanBuilder match {
+        case r: SupportsPushDownAggregates =>
+          val translatedAggregates = mutable.ArrayBuffer.empty[sources.AggregateFunction]
+          // Catalyst aggregate expression that can't be translated to data source aggregates.
+          val untranslatableExprs = mutable.ArrayBuffer.empty[AggregateExpression]
+
+          for (aggregateExpr <- aggregates) {
+            val translated = DataSourceStrategy.translateAggregate(aggregateExpr)
+            if (translated.isEmpty) {
+              untranslatableExprs += aggregateExpr
+            } else {
+              translatedAggregates += translated.get
+            }
+          }
+
+          if (untranslatableExprs.isEmpty) r.pushAggregates(translatedAggregates.toArray)
+
+          // push down only if all the aggregates can be pushed down
+          if (!r.pushedAggregates.isEmpty) (r.pushedAggregates, Nil) else (Nil, aggregates)

Review comment:
       I actually do aggregate at Spark layer regardless if aggregate is pushed down or not




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org