You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/10 05:37:07 UTC

[spark] branch master updated: [SPARK-39929][SQL] DS V2 supports push down string functions(non ANSI)

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b2169129df [SPARK-39929][SQL] DS V2 supports push down string  functions(non ANSI)
3b2169129df is described below

commit 3b2169129dfb1efd1e9373eb7126187676a57f3e
Author: biaobiao.sun <13...@qq.com>
AuthorDate: Wed Aug 10 13:36:51 2022 +0800

    [SPARK-39929][SQL] DS V2 supports push down string  functions(non ANSI)
    
    **What changes were proposed in this pull request?**
    
    support more  commonly used string functions
    
    BIT_LENGTH
    CHAR_LENGTH
    CONCAT
    
    The mainstream databases support these functions show below.
    
    Function | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | SQLite | Influxdata | Singlestore | ElasticSearch
    -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
    BIT_LENGTH | Yes | Yes | Yes | Yes | Yes | no | no | no | no | Yes | Yes | Yes | no | Yes | no | Yes | no | no | no | no | no | Yes
    CHAR_LENGTH | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | no | Yes | Yes | Yes | Yes
    CONCAT | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | no | no | no | Yes | Yes
    
    **Why are the changes needed?**
    DS V2 supports push down string functions
    
    **Does this PR introduce any user-facing change?**
    'No'.
    New feature.
    
    How was this patch tested?
    New tests.
    
    Closes #37427 from zheniantoushipashi/SPARK-39929.
    
    Authored-by: biaobiao.sun <13...@qq.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../connector/expressions/GeneralScalarExpression.java | 18 ++++++++++++++++++
 .../sql/connector/util/V2ExpressionSQLBuilder.java     |  3 +++
 .../spark/sql/catalyst/util/V2ExpressionBuilder.scala  |  7 ++++++-
 .../scala/org/apache/spark/sql/jdbc/H2Dialect.scala    |  3 ++-
 .../scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala  | 16 ++++++++++++++++
 5 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
index 9ef0d481bc9..8339e341b8e 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
@@ -340,6 +340,24 @@ import org.apache.spark.sql.internal.connector.ToStringSQLBuilder;
  *    <li>Since version: 3.4.0</li>
  *   </ul>
  *  </li>
+ *  <li>Name: <code>BIT_LENGTH</code>
+ *   <ul>
+ *    <li>SQL semantic: <code>BIT_LENGTH(src)</code></li>
+ *    <li>Since version: 3.4.0</li>
+ *   </ul>
+ *  </li>
+ *  <li>Name: <code>CHAR_LENGTH</code>
+ *   <ul>
+ *    <li>SQL semantic: <code>CHAR_LENGTH(src)</code></li>
+ *    <li>Since version: 3.4.0</li>
+ *   </ul>
+ *  </li>
+ *  <li>Name: <code>CONCAT</code>
+ *   <ul>
+ *    <li>SQL semantic: <code>CONCAT(col1, col2, ..., colN)</code></li>
+ *    <li>Since version: 3.4.0</li>
+ *   </ul>
+ *  </li>
  *  <li>Name: <code>OVERLAY</code>
  *   <ul>
  *    <li>SQL semantic: <code>OVERLAY(string, replace, position[, length])</code></li>
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index 3a78a946e36..4fa132ccfd1 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -155,6 +155,9 @@ public class V2ExpressionSQLBuilder {
         case "SHA2":
         case "MD5":
         case "CRC32":
+        case "BIT_LENGTH":
+        case "CHAR_LENGTH":
+        case "CONCAT":
           return visitSQLFunction(name,
             Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
         case "CASE_WHEN": {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
index 89115bf7ab5..18101ec3df8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression =>
 import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Avg, Count, CountStar, GeneralAggregateFunc, Max, Min, Sum, UserDefinedAggregateFunc}
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate}
 import org.apache.spark.sql.execution.datasources.PushableExpression
-import org.apache.spark.sql.types.{BooleanType, IntegerType}
+import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType}
 
 /**
  * The builder to generate V2 expressions from catalyst expressions.
@@ -217,6 +217,11 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
       generateExpressionWithName("SUBSTRING", children)
     case Upper(child) => generateExpressionWithName("UPPER", Seq(child))
     case Lower(child) => generateExpressionWithName("LOWER", Seq(child))
+    case BitLength(child) if child.dataType.isInstanceOf[StringType] =>
+      generateExpressionWithName("BIT_LENGTH", Seq(child))
+    case Length(child) if child.dataType.isInstanceOf[StringType] =>
+      generateExpressionWithName("CHAR_LENGTH", Seq(child))
+    case concat: Concat => generateExpressionWithName("CONCAT", concat.children)
     case translate: StringTranslate => generateExpressionWithName("TRANSLATE", translate.children)
     case trim: StringTrim => generateExpressionWithName("TRIM", trim.children)
     case trim: StringTrimLeft => generateExpressionWithName("LTRIM", trim.children)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 737e3de10a9..7665bb91c6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -50,7 +50,8 @@ private[sql] object H2Dialect extends JdbcDialect {
     Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LOG", "LOG10", "LN", "EXP",
       "POWER", "SQRT", "FLOOR", "CEIL", "ROUND", "SIN", "SINH", "COS", "COSH", "TAN",
       "TANH", "COT", "ASIN", "ACOS", "ATAN", "ATAN2", "DEGREES", "RADIANS", "SIGN",
-      "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM", "MD5", "SHA1", "SHA2")
+      "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM", "MD5", "SHA1", "SHA2",
+      "BIT_LENGTH", "CHAR_LENGTH", "CONCAT")
 
   override def isSupportedFunction(funcName: String): Boolean =
     supportedFunctions.contains(funcName)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index a8c770f46cd..a5ea2589b63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -1626,6 +1626,22 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       "PushedFilters: [NAME IS NOT NULL]"
     checkPushedInfo(df5, expectedPlanFragment5)
     checkAnswer(df5, Seq(Row(6, "jen", 12000, 1200, true)))
+
+    val df6 = sql("SELECT * FROM h2.test.employee WHERE bit_length(name) = 40")
+    checkFiltersRemoved(df6)
+    checkPushedInfo(df6, "[NAME IS NOT NULL, BIT_LENGTH(NAME) = 40]")
+    checkAnswer(df6, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true)))
+
+    val df7 = sql("SELECT * FROM h2.test.employee WHERE char_length(name) = 5")
+    checkFiltersRemoved(df7)
+    checkPushedInfo(df7, "[NAME IS NOT NULL, CHAR_LENGTH(NAME) = 5]")
+    checkAnswer(df6, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true)))
+
+    val df8 = sql("SELECT * FROM h2.test.employee WHERE " +
+      "concat(name, ',' , cast(salary as string)) = 'cathy,9000.00'")
+    checkFiltersRemoved(df8)
+    checkPushedInfo(df8, "[(CONCAT(NAME, ',', CAST(SALARY AS string))) = 'cathy,9000.00']")
+    checkAnswer(df8, Seq(Row(1, "cathy", 9000, 1200, false)))
   }
 
   test("scan with aggregate push-down: MAX AVG with filter and group by") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org