You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/01 00:48:55 UTC

[GitHub] [spark] huaxingao commented on a change in pull request #29695: [SPARK-32833][SQL] [WIP]JDBC V2 Datasource aggregate push down

huaxingao commented on a change in pull request #29695:
URL: https://github.com/apache/spark/pull/29695#discussion_r497889888



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -643,6 +647,34 @@ object DataSourceStrategy {
     (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
   }
 
+  def translateAggregate(aggregates: AggregateExpression): Option[AggregateFunc] = {
+
+    def columnAsString(e: Expression): String = e match {
+      case AttributeReference(name, _, _, _) => name
+      case Cast(child, _, _) => child match {

Review comment:
       I guess this is probably OK?
   For example, if I have `sum(cast(SALARY as bigInt))` here, I will remove cast and push down `sum(SALARY)` to data source. Then I will cast the output of `sum(SALARY)` to bigInt. 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -643,6 +647,34 @@ object DataSourceStrategy {
     (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
   }
 
+  def translateAggregate(aggregates: AggregateExpression): Option[AggregateFunc] = {
+
+    def columnAsString(e: Expression): String = e match {
+      case AttributeReference(name, _, _, _) => name
+      case Cast(child, _, _) => child match {
+        case AttributeReference(name, _, _, _) => name
+        case _ => ""
+      }
+      case _ => ""
+    }
+
+    aggregates.aggregateFunction match {

Review comment:
       I will need to change the following to add `isDistinct` and `filter`. Also change `translateAggregate` accordingly. When push down the aggregates, need to check the filter to make sure it can be pushed down too.
   ```
   case class Avg(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
   
   case class Min(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
   
   case class Max(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
   
   case class Sum(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org