You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/01 12:18:13 UTC

[spark] branch master updated: [SPARK-42278][SQL] DS V2 pushdown supports supports JDBC dialects compile `SortOrder` by themselves

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 20eb54661e5 [SPARK-42278][SQL] DS V2 pushdown supports supports JDBC dialects compile `SortOrder` by themselves
20eb54661e5 is described below

commit 20eb54661e5bf6e2e350b8311007a63f7beabc7a
Author: Jiaan Geng <be...@163.com>
AuthorDate: Wed Feb 1 20:17:39 2023 +0800

    [SPARK-42278][SQL] DS V2 pushdown supports supports JDBC dialects compile `SortOrder` by themselves
    
    ### What changes were proposed in this pull request?
    Currently, DS V2 pushdown framework compiles the `SortOrder` with fixed code. These fixed code outputs the fixed syntax format, such as `ORDER BY col ASC NULLS FIRST`.
    This is not flexible and friendly for some databases that do not support this syntax.
    For example, `ORDER BY col ASC NULLS FIRST` is not supported by MS SQL Server who not recognize the syntax `NULLS FIRST`.
    
    ### Why are the changes needed?
    This PR want compile the `SortOrder` with `V2ExpressionSQLBuilder`'s `visitSortOrder`, so that JDBC dialects could compile `SortOrder` by themselves.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    Exists test cases.
    
    Closes #39846 from beliefer/SPARK-42278.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/connector/util/V2ExpressionSQLBuilder.java     | 12 ++++++++++++
 .../sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala  |  5 +----
 2 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index fe16174586b..9ca0fe4787f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -27,6 +27,9 @@ import org.apache.spark.sql.connector.expressions.Extract;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.GeneralScalarExpression;
 import org.apache.spark.sql.connector.expressions.Literal;
+import org.apache.spark.sql.connector.expressions.NullOrdering;
+import org.apache.spark.sql.connector.expressions.SortDirection;
+import org.apache.spark.sql.connector.expressions.SortOrder;
 import org.apache.spark.sql.connector.expressions.UserDefinedScalarFunc;
 import org.apache.spark.sql.connector.expressions.aggregate.Avg;
 import org.apache.spark.sql.connector.expressions.aggregate.Max;
@@ -56,6 +59,10 @@ public class V2ExpressionSQLBuilder {
     } else if (expr instanceof Extract) {
       Extract extract = (Extract) expr;
       return visitExtract(extract.field(), build(extract.source()));
+    } else if (expr instanceof SortOrder) {
+      SortOrder sortOrder = (SortOrder) expr;
+      return visitSortOrder(
+        build(sortOrder.expression()), sortOrder.direction(), sortOrder.nullOrdering());
     } else if (expr instanceof GeneralScalarExpression) {
       GeneralScalarExpression e = (GeneralScalarExpression) expr;
       String name = e.name();
@@ -368,6 +375,11 @@ public class V2ExpressionSQLBuilder {
     return "EXTRACT(" + field + " FROM " + source + ")";
   }
 
+  protected String visitSortOrder(
+      String sortKey, SortDirection sortDirection, NullOrdering nullOrdering) {
+    return sortKey + " " + sortDirection + " " + nullOrdering;
+  }
+
   private String joinArrayToString(
       String[] inputs, CharSequence delimiter, CharSequence prefix, CharSequence suffix) {
     StringJoiner joiner = new StringJoiner(delimiter, prefix, suffix);
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
index 5fc2edc042c..4c62c4c1c4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
@@ -158,10 +158,7 @@ case class JDBCScanBuilder(
   override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
     if (jdbcOptions.pushDownLimit) {
       val dialect = JdbcDialects.get(jdbcOptions.url)
-      val compiledOrders = orders.flatMap { order =>
-        dialect.compileExpression(order.expression())
-          .map(sortKey => s"$sortKey ${order.direction()} ${order.nullOrdering()}")
-      }
+      val compiledOrders = orders.flatMap(dialect.compileExpression(_))
       if (orders.length != compiledOrders.length) return false
       pushedLimit = limit
       sortOrders = compiledOrders


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org