You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/10 05:37:07 UTC
[spark] branch master updated: [SPARK-39929][SQL] DS V2 supports push down string functions(non ANSI)
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3b2169129df [SPARK-39929][SQL] DS V2 supports push down string functions(non ANSI)
3b2169129df is described below
commit 3b2169129dfb1efd1e9373eb7126187676a57f3e
Author: biaobiao.sun <13...@qq.com>
AuthorDate: Wed Aug 10 13:36:51 2022 +0800
[SPARK-39929][SQL] DS V2 supports push down string functions(non ANSI)
**What changes were proposed in this pull request?**
support more commonly used string functions
BIT_LENGTH
CHAR_LENGTH
CONCAT
The mainstream databases support these functions show below.
Function | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | SQLite | Influxdata | Singlestore | ElasticSearch
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
BIT_LENGTH | Yes | Yes | Yes | Yes | Yes | no | no | no | no | Yes | Yes | Yes | no | Yes | no | Yes | no | no | no | no | no | Yes
CHAR_LENGTH | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | no | Yes | Yes | Yes | Yes
CONCAT | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | no | no | no | Yes | Yes
**Why are the changes needed?**
DS V2 supports push down string functions
**Does this PR introduce any user-facing change?**
'No'.
New feature.
How was this patch tested?
New tests.
Closes #37427 from zheniantoushipashi/SPARK-39929.
Authored-by: biaobiao.sun <13...@qq.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../connector/expressions/GeneralScalarExpression.java | 18 ++++++++++++++++++
.../sql/connector/util/V2ExpressionSQLBuilder.java | 3 +++
.../spark/sql/catalyst/util/V2ExpressionBuilder.scala | 7 ++++++-
.../scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 3 ++-
.../scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 16 ++++++++++++++++
5 files changed, 45 insertions(+), 2 deletions(-)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
index 9ef0d481bc9..8339e341b8e 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
@@ -340,6 +340,24 @@ import org.apache.spark.sql.internal.connector.ToStringSQLBuilder;
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
+ * <li>Name: <code>BIT_LENGTH</code>
+ * <ul>
+ * <li>SQL semantic: <code>BIT_LENGTH(src)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
+ * <li>Name: <code>CHAR_LENGTH</code>
+ * <ul>
+ * <li>SQL semantic: <code>CHAR_LENGTH(src)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
+ * <li>Name: <code>CONCAT</code>
+ * <ul>
+ * <li>SQL semantic: <code>CONCAT(col1, col2, ..., colN)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
* <li>Name: <code>OVERLAY</code>
* <ul>
* <li>SQL semantic: <code>OVERLAY(string, replace, position[, length])</code></li>
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index 3a78a946e36..4fa132ccfd1 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -155,6 +155,9 @@ public class V2ExpressionSQLBuilder {
case "SHA2":
case "MD5":
case "CRC32":
+ case "BIT_LENGTH":
+ case "CHAR_LENGTH":
+ case "CONCAT":
return visitSQLFunction(name,
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
case "CASE_WHEN": {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
index 89115bf7ab5..18101ec3df8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression =>
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Avg, Count, CountStar, GeneralAggregateFunc, Max, Min, Sum, UserDefinedAggregateFunc}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate}
import org.apache.spark.sql.execution.datasources.PushableExpression
-import org.apache.spark.sql.types.{BooleanType, IntegerType}
+import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType}
/**
* The builder to generate V2 expressions from catalyst expressions.
@@ -217,6 +217,11 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
generateExpressionWithName("SUBSTRING", children)
case Upper(child) => generateExpressionWithName("UPPER", Seq(child))
case Lower(child) => generateExpressionWithName("LOWER", Seq(child))
+ case BitLength(child) if child.dataType.isInstanceOf[StringType] =>
+ generateExpressionWithName("BIT_LENGTH", Seq(child))
+ case Length(child) if child.dataType.isInstanceOf[StringType] =>
+ generateExpressionWithName("CHAR_LENGTH", Seq(child))
+ case concat: Concat => generateExpressionWithName("CONCAT", concat.children)
case translate: StringTranslate => generateExpressionWithName("TRANSLATE", translate.children)
case trim: StringTrim => generateExpressionWithName("TRIM", trim.children)
case trim: StringTrimLeft => generateExpressionWithName("LTRIM", trim.children)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 737e3de10a9..7665bb91c6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -50,7 +50,8 @@ private[sql] object H2Dialect extends JdbcDialect {
Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LOG", "LOG10", "LN", "EXP",
"POWER", "SQRT", "FLOOR", "CEIL", "ROUND", "SIN", "SINH", "COS", "COSH", "TAN",
"TANH", "COT", "ASIN", "ACOS", "ATAN", "ATAN2", "DEGREES", "RADIANS", "SIGN",
- "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM", "MD5", "SHA1", "SHA2")
+ "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM", "MD5", "SHA1", "SHA2",
+ "BIT_LENGTH", "CHAR_LENGTH", "CONCAT")
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index a8c770f46cd..a5ea2589b63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -1626,6 +1626,22 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedFilters: [NAME IS NOT NULL]"
checkPushedInfo(df5, expectedPlanFragment5)
checkAnswer(df5, Seq(Row(6, "jen", 12000, 1200, true)))
+
+ val df6 = sql("SELECT * FROM h2.test.employee WHERE bit_length(name) = 40")
+ checkFiltersRemoved(df6)
+ checkPushedInfo(df6, "[NAME IS NOT NULL, BIT_LENGTH(NAME) = 40]")
+ checkAnswer(df6, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true)))
+
+ val df7 = sql("SELECT * FROM h2.test.employee WHERE char_length(name) = 5")
+ checkFiltersRemoved(df7)
+ checkPushedInfo(df7, "[NAME IS NOT NULL, CHAR_LENGTH(NAME) = 5]")
+ checkAnswer(df6, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true)))
+
+ val df8 = sql("SELECT * FROM h2.test.employee WHERE " +
+ "concat(name, ',' , cast(salary as string)) = 'cathy,9000.00'")
+ checkFiltersRemoved(df8)
+ checkPushedInfo(df8, "[(CONCAT(NAME, ',', CAST(SALARY AS string))) = 'cathy,9000.00']")
+ checkAnswer(df8, Seq(Row(1, "cathy", 9000, 1200, false)))
}
test("scan with aggregate push-down: MAX AVG with filter and group by") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org