You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/26 07:26:22 UTC

[GitHub] [spark] maropu commented on a change in pull request #29695: [SPARK-22390][SPARK-32833][SQL] JDBC V2 Datasource aggregate push down

maropu commented on a change in pull request #29695:
URL: https://github.com/apache/spark/pull/29695#discussion_r602060646



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
##########
@@ -191,6 +191,9 @@ class JDBCOptions(
   // An option to allow/disallow pushing down predicate into JDBC data source
   val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean
 
+  // An option to allow/disallow pushing down aggregate into JDBC data source
+  val pushDownAggregate = parameters.getOrElse(JDBC_PUSHDOWN_AGGREGATE, "false").toBoolean

Review comment:
       I think this should be documented in https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -700,6 +704,51 @@ object DataSourceStrategy
     (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
   }
 
+  private def columnAsString(e: Expression): String = e match {
+    case AttributeReference(name, _, _, _) => name
+    case Cast(child, _, _) => columnAsString (child)
+    case Add(left, right, _) =>
+      columnAsString(left) + " + " + columnAsString(right)
+    case Subtract(left, right, _) =>
+      columnAsString(left) + " - " + columnAsString(right)
+    case Multiply(left, right, _) =>
+      columnAsString(left) + " * " + columnAsString(right)
+    case Divide(left, right, _) =>
+      columnAsString(left) + " / " + columnAsString(right)
+    case CheckOverflow(child, _, _) => columnAsString (child)
+    case PromotePrecision(child) => columnAsString (child)
+    case _ => ""
+  }
+
+  protected[sql] def translateAggregate(aggregates: AggregateExpression): Option[AggregateFunc] = {
+

Review comment:
       nit: unnecessary blank.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -700,6 +704,41 @@ object DataSourceStrategy
     (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
   }
 
+  private def columnAsString(e: Expression): String = e match {

Review comment:
       > This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.
   
   +1
   
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -181,21 +248,53 @@ private[jdbc] class JDBCRDD(
     filters: Array[Filter],
     partitions: Array[Partition],
     url: String,
-    options: JDBCOptions)
+    options: JDBCOptions,
+    aggregation: Aggregation = Aggregation.empty)
   extends RDD[InternalRow](sc, Nil) {
 
   /**
    * Retrieve the list of partitions corresponding to this RDD.
    */
   override def getPartitions: Array[Partition] = partitions
 
+  private var updatedSchema: StructType = new StructType()

Review comment:
       Could we avoid to use `var` here?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
##########
@@ -77,15 +77,25 @@ case class Count(children: Seq[Expression]) extends DeclarativeAggregate {
 
   override def defaultResult: Option[Literal] = Option(Literal(0L))
 
+  private[sql] var pushDown: Boolean = false

Review comment:
       I feel putting this variable for the pushdown feature does not look a good design. We cannot extend `Count` for pushdown-specific counting?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -700,6 +704,51 @@ object DataSourceStrategy
     (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
   }
 
+  private def columnAsString(e: Expression): String = e match {
+    case AttributeReference(name, _, _, _) => name
+    case Cast(child, _, _) => columnAsString (child)
+    case Add(left, right, _) =>
+      columnAsString(left) + " + " + columnAsString(right)
+    case Subtract(left, right, _) =>
+      columnAsString(left) + " - " + columnAsString(right)
+    case Multiply(left, right, _) =>
+      columnAsString(left) + " * " + columnAsString(right)
+    case Divide(left, right, _) =>
+      columnAsString(left) + " / " + columnAsString(right)
+    case CheckOverflow(child, _, _) => columnAsString (child)
+    case PromotePrecision(child) => columnAsString (child)
+    case _ => ""
+  }
+
+  protected[sql] def translateAggregate(aggregates: AggregateExpression): Option[AggregateFunc] = {
+
+    aggregates.aggregateFunction match {
+      case min: aggregate.Min =>
+        val colName = columnAsString(min.child)
+        if (colName.nonEmpty) Some(Min(colName, min.dataType)) else None
+      case max: aggregate.Max =>
+        val colName = columnAsString(max.child)
+        if (colName.nonEmpty) Some(Max(colName, max.dataType)) else None
+      case avg: aggregate.Average =>
+        val colName = columnAsString(avg.child)
+        if (colName.nonEmpty) Some(Avg(colName, avg.dataType, aggregates.isDistinct)) else None
+      case sum: aggregate.Sum =>
+        val colName = columnAsString(sum.child)
+        if (colName.nonEmpty) Some(Sum(colName, sum.dataType, aggregates.isDistinct)) else None
+      case count: aggregate.Count =>
+        val columnName = count.children.head match {
+          case Literal(_, _) =>
+            "1"

Review comment:
       nit: `          case Literal(_, _) => "1"`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org