You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/05 10:33:50 UTC
[spark] branch master updated: [SPARK-38901][SQL] DS V2 supports push down misc functions
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8a0e299947c [SPARK-38901][SQL] DS V2 supports push down misc functions
8a0e299947c is described below
commit 8a0e299947c82ec27b74c5555f7099b188a1c386
Author: chenzhx <ch...@apache.org>
AuthorDate: Fri Aug 5 18:33:30 2022 +0800
[SPARK-38901][SQL] DS V2 supports push down misc functions
### What changes were proposed in this pull request?
Currently, Spark have some misc functions. Please refer
https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L688
These functions show below:
`AES_ENCRYPT,`
`AES_DECRYPT`,
`SHA1`,
`SHA2`,
`MD5`,
`CRC32`
Function|PostgreSQL|ClickHouse|H2|MySQL|Oracle|Redshift|Snowflake|DB2|Vertica|Exasol|SqlServer|Yellowbrick|Mariadb|Singlestore|
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
`AesEncrypt`|Yes|Yes|Yes|Yes|Yes|NO|Yes|Yes|NO|NO|NO|Yes|Yes|Yes|
`AesDecrypt`|Yes|Yes|Yes|Yes|Yes|NO|Yes|Yes|NO|NO|NO|Yes|Yes|Yes|
`Sha1`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|
`Sha2`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|
`Md5`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|
`Crc32`|No|Yes|No|Yes|NO|Yes|NO|Yes|NO|NO|NO|NO|NO|Yes|
DS V2 should supports push down these misc functions.
### Why are the changes needed?
DS V2 supports push down misc functions.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
New tests.
Closes #37169 from chenzhx/misc.
Authored-by: chenzhx <ch...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../expressions/GeneralScalarExpression.java | 36 +++++++++++
.../sql/connector/util/V2ExpressionSQLBuilder.java | 6 ++
.../sql/catalyst/util/V2ExpressionBuilder.scala | 6 ++
.../org/apache/spark/sql/jdbc/H2Dialect.scala | 19 +++++-
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 71 +++++++++++++++++++---
5 files changed, 129 insertions(+), 9 deletions(-)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
index 53c511a87f6..9ef0d481bc9 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
@@ -364,6 +364,42 @@ import org.apache.spark.sql.internal.connector.ToStringSQLBuilder;
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
+ * <li>Name: <code>AES_ENCRYPT</code>
+ * <ul>
+ * <li>SQL semantic: <code>AES_ENCRYPT(expr, key[, mode[, padding]])</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
+ * <li>Name: <code>AES_DECRYPT</code>
+ * <ul>
+ * <li>SQL semantic: <code>AES_DECRYPT(expr, key[, mode[, padding]])</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
+ * <li>Name: <code>SHA1</code>
+ * <ul>
+ * <li>SQL semantic: <code>SHA1(expr)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
+ * <li>Name: <code>SHA2</code>
+ * <ul>
+ * <li>SQL semantic: <code>SHA2(expr, bitLength)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
+ * <li>Name: <code>MD5</code>
+ * <ul>
+ * <li>SQL semantic: <code>MD5(expr)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
+ * <li>Name: <code>CRC32</code>
+ * <ul>
+ * <li>SQL semantic: <code>CRC32(expr)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
* </ol>
* Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off,
* including: add, subtract, multiply, divide, remainder, pmod.
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index 541b88a5027..3a78a946e36 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -149,6 +149,12 @@ public class V2ExpressionSQLBuilder {
case "DATE_ADD":
case "DATE_DIFF":
case "TRUNC":
+ case "AES_ENCRYPT":
+ case "AES_DECRYPT":
+ case "SHA1":
+ case "SHA2":
+ case "MD5":
+ case "CRC32":
return visitSQLFunction(name,
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
case "CASE_WHEN": {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
index d451c73b39d..b274fd38e99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
@@ -258,6 +258,12 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
generateExpression(child).map(v => new V2Extract("WEEK", v))
case YearOfWeek(child) =>
generateExpression(child).map(v => new V2Extract("YEAR_OF_WEEK", v))
+ case encrypt: AesEncrypt => generateExpressionWithName("AES_ENCRYPT", encrypt.children)
+ case decrypt: AesDecrypt => generateExpressionWithName("AES_DECRYPT", decrypt.children)
+ case Crc32(child) => generateExpressionWithName("CRC32", Seq(child))
+ case Md5(child) => generateExpressionWithName("MD5", Seq(child))
+ case Sha1(child) => generateExpressionWithName("SHA1", Seq(child))
+ case sha2: Sha2 => generateExpressionWithName("SHA2", sha2.children)
// TODO supports other expressions
case ApplyFunctionExpression(function, children) =>
val childrenExpressions = children.flatMap(generateExpression(_))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 4200ba91fb1..737e3de10a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -50,7 +50,7 @@ private[sql] object H2Dialect extends JdbcDialect {
Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LOG", "LOG10", "LN", "EXP",
"POWER", "SQRT", "FLOOR", "CEIL", "ROUND", "SIN", "SINH", "COS", "COSH", "TAN",
"TANH", "COT", "ASIN", "ACOS", "ATAN", "ATAN2", "DEGREES", "RADIANS", "SIGN",
- "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM")
+ "PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM", "MD5", "SHA1", "SHA2")
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
@@ -235,5 +235,22 @@ private[sql] object H2Dialect extends JdbcDialect {
}
s"EXTRACT($newField FROM $source)"
}
+
+ override def visitSQLFunction(funcName: String, inputs: Array[String]): String = {
+ if (isSupportedFunction(funcName)) {
+ funcName match {
+ case "MD5" =>
+ "RAWTOHEX(HASH('MD5', " + inputs.mkString(",") + "))"
+ case "SHA1" =>
+ "RAWTOHEX(HASH('SHA-1', " + inputs.mkString(",") + "))"
+ case "SHA2" =>
+ "RAWTOHEX(HASH('SHA-" + inputs(1) + "'," + inputs(0) + "))"
+ case _ => super.visitSQLFunction(funcName, inputs)
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ s"${this.getClass.getSimpleName} does not support function: $funcName");
+ }
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index da4f9175cd5..02dff0973fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -45,6 +45,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val tempDir = Utils.createTempDir()
val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
+ val testBytes = Array[Byte](99.toByte, 134.toByte, 135.toByte, 200.toByte, 205.toByte) ++
+ Array.fill(15)(0.toByte)
val testH2Dialect = new JdbcDialect {
override def canHandle(url: String): Boolean = H2Dialect.canHandle(url)
@@ -178,6 +180,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " +
"('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
+
+ conn.prepareStatement("CREATE TABLE \"test\".\"binary1\" (name TEXT(32),b BINARY(20))")
+ .executeUpdate()
+ val stmt = conn.prepareStatement("INSERT INTO \"test\".\"binary1\" VALUES (?, ?)")
+ stmt.setString(1, "jen")
+ stmt.setBytes(2, testBytes)
+ stmt.executeUpdate()
}
H2Dialect.registerFunction("my_avg", IntegralAverage)
H2Dialect.registerFunction("my_strlen", StrLen(CharLength))
@@ -860,7 +869,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkSortRemoved(df2)
checkPushedInfo(df2,
"PushedFilters: [DEPT IS NOT NULL, DEPT > 1]",
- "PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1")
+ "PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1")
checkAnswer(df2, Seq(Row(2, "david", 10000.00)))
}
@@ -1190,6 +1199,52 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df8, Seq(Row("alex")))
}
+ test("scan with filter push-down with misc functions") {
+ val df1 = sql("SELECT name FROM h2.test.binary1 WHERE " +
+ "md5(b) = '4371fe0aa613bcb081543a37d241adcb'")
+ checkFiltersRemoved(df1)
+ val expectedPlanFragment1 = "PushedFilters: [B IS NOT NULL, " +
+ "MD5(B) = '4371fe0aa613bcb081543a37d241adcb']"
+ checkPushedInfo(df1, expectedPlanFragment1)
+ checkAnswer(df1, Seq(Row("jen")))
+
+ val df2 = sql("SELECT name FROM h2.test.binary1 WHERE " +
+ "sha1(b) = 'cf355e86e8666f9300ef12e996acd5c629e0b0a1'")
+ checkFiltersRemoved(df2)
+ val expectedPlanFragment2 = "PushedFilters: [B IS NOT NULL, " +
+ "SHA1(B) = 'cf355e86e8666f9300ef12e996acd5c629e0b0a1'],"
+ checkPushedInfo(df2, expectedPlanFragment2)
+ checkAnswer(df2, Seq(Row("jen")))
+
+ val df3 = sql("SELECT name FROM h2.test.binary1 WHERE " +
+ "sha2(b, 256) = '911732d10153f859dec04627df38b19290ec707ff9f83910d061421fdc476109'")
+ checkFiltersRemoved(df3)
+ val expectedPlanFragment3 = "PushedFilters: [B IS NOT NULL, (SHA2(B, 256)) = " +
+ "'911732d10153f859dec04627df38b19290ec707ff9f83910d061421fdc476109']"
+ checkPushedInfo(df3, expectedPlanFragment3)
+ checkAnswer(df3, Seq(Row("jen")))
+
+ val df4 = sql("SELECT * FROM h2.test.employee WHERE crc32(name) = '142689369'")
+ checkFiltersRemoved(df4, false)
+ val expectedPlanFragment4 = "PushedFilters: [NAME IS NOT NULL], "
+ checkPushedInfo(df4, expectedPlanFragment4)
+ checkAnswer(df4, Seq(Row(6, "jen", 12000, 1200, true)))
+
+ val df5 = sql("SELECT name FROM h2.test.employee WHERE " +
+ "aes_encrypt(cast(null as string), name) is null")
+ checkFiltersRemoved(df5, false)
+ val expectedPlanFragment5 = "PushedFilters: [], "
+ checkPushedInfo(df5, expectedPlanFragment5)
+ checkAnswer(df5, Seq(Row("amy"), Row("cathy"), Row("alex"), Row("david"), Row("jen")))
+
+ val df6 = sql("SELECT name FROM h2.test.employee WHERE " +
+ "aes_decrypt(cast(null as binary), name) is null")
+ checkFiltersRemoved(df6, false)
+ val expectedPlanFragment6 = "PushedFilters: [], "
+ checkPushedInfo(df6, expectedPlanFragment6)
+ checkAnswer(df6, Seq(Row("amy"), Row("cathy"), Row("alex"), Row("david"), Row("jen")))
+ }
+
test("scan with filter push-down with UDF") {
JdbcDialects.unregisterDialect(H2Dialect)
try {
@@ -1267,7 +1322,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
Seq(Row("test", "people", false), Row("test", "empty_table", false),
Row("test", "employee", false), Row("test", "item", false), Row("test", "dept", false),
Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false),
- Row("test", "datetime", false)))
+ Row("test", "datetime", false), Row("test", "binary1", false)))
}
test("SQL API: create table as select") {
@@ -1817,12 +1872,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkFiltersRemoved(df)
checkAggregateRemoved(df)
checkPushedInfo(df,
- """
- |PushedAggregates: [VAR_POP(BONUS), VAR_POP(DISTINCT BONUS),
- |VAR_SAMP(BONUS), VAR_SAMP(DISTINCT BONUS)],
- |PushedFilters: [DEPT IS NOT NULL, DEPT > 0],
- |PushedGroupByExpressions: [DEPT],
- |""".stripMargin.replaceAll("\n", " "))
+ """
+ |PushedAggregates: [VAR_POP(BONUS), VAR_POP(DISTINCT BONUS),
+ |VAR_SAMP(BONUS), VAR_SAMP(DISTINCT BONUS)],
+ |PushedFilters: [DEPT IS NOT NULL, DEPT > 0],
+ |PushedGroupByExpressions: [DEPT],
+ |""".stripMargin.replaceAll("\n", " "))
checkAnswer(df, Seq(Row(10000d, 10000d, 20000d, 20000d),
Row(2500d, 2500d, 5000d, 5000d), Row(0d, 0d, null, null)))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org