You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/08 03:34:47 UTC
[spark] branch master updated: [SPARK-38899][SQL] DS V2 supports push down datetime functions
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1df405fb122 [SPARK-38899][SQL] DS V2 supports push down datetime functions
1df405fb122 is described below
commit 1df405fb122fa492e2f499b9bb1cf3ba5ecfd060
Author: chenzhx <ch...@apache.org>
AuthorDate: Fri Jul 8 11:34:23 2022 +0800
[SPARK-38899][SQL] DS V2 supports push down datetime functions
### What changes were proposed in this pull request?
Currently, Spark have some datetime functions. Please refer
https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L577
These functions show below:
`DATE_ADD,`
`DATEDIFF`,
`TRUNC`,
`EXTRACT`,
`SECOND`,
`MINUTE`,
`HOUR`,
`MONTH`,
`QUARTER`,
`YEAR`,
`DAYOFWEEK`,
`DAYOFMONTH`,
`DAYOFYEAR`
The mainstream databases support these functions show below.
Function|PostgreSQL|ClickHouse|H2|MySQL|Oracle|Presto|Teradata|Snowflake|DB2|Vertica|Exasol|Impala|Mariadb|Druid|Singlestore|ElasticSearch
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
`DateAdd`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|No|No|Yes|Yes|No|Yes|Yes
`DateDiff`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|Yes|No|Yes|Yes|No|Yes|Yes
`DateTrunc`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes| Yes|Yes|Yes|Yes|No|Yes|Yes|Yes
`Hour`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Minute`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Month`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Quarter`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Second`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Year`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`DayOfMonth`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes
`DayOfWeek`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes
`DayOfYear`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes
`WEEK_OF_YEAR`|Yes|No|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes
`YEAR_OF_WEEK`|No|No|Yes|Yes|Yes|Yes|No|Yes|No|No|No|No|Yes|No|No|No
DS V2 should supports push down these datetime functions.
### Why are the changes needed?
DS V2 supports push down datetime functions.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
New tests.
Closes #36663 from chenzhx/datetime.
Authored-by: chenzhx <ch...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/connector/expressions/Extract.java | 62 +++++++++
.../expressions/GeneralScalarExpression.java | 18 +++
.../sql/connector/util/V2ExpressionSQLBuilder.java | 11 ++
.../sql/catalyst/util/V2ExpressionBuilder.scala | 57 +++++++-
.../org/apache/spark/sql/jdbc/H2Dialect.scala | 26 ++++
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 146 +++++++++++++++++----
6 files changed, 296 insertions(+), 24 deletions(-)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java
new file mode 100644
index 00000000000..a925f1ee31a
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.expressions;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.io.Serializable;
+
+/**
+ * Represent an extract function, which extracts and returns the value of a
+ * specified datetime field from a datetime or interval value expression.
+ * <p>
+ * The currently supported fields names following the ISO standard:
+ * <ol>
+ * <li> <code>SECOND</code> Since 3.4.0 </li>
+ * <li> <code>MINUTE</code> Since 3.4.0 </li>
+ * <li> <code>HOUR</code> Since 3.4.0 </li>
+ * <li> <code>MONTH</code> Since 3.4.0 </li>
+ * <li> <code>QUARTER</code> Since 3.4.0 </li>
+ * <li> <code>YEAR</code> Since 3.4.0 </li>
+ * <li> <code>DAY_OF_WEEK</code> Since 3.4.0 </li>
+ * <li> <code>DAY</code> Since 3.4.0 </li>
+ * <li> <code>DAY_OF_YEAR</code> Since 3.4.0 </li>
+ * <li> <code>WEEK</code> Since 3.4.0 </li>
+ * <li> <code>YEAR_OF_WEEK</code> Since 3.4.0 </li>
+ * </ol>
+ *
+ * @since 3.4.0
+ */
+
+@Evolving
+public class Extract implements Expression, Serializable {
+
+ private String field;
+ private Expression source;
+
+ public Extract(String field, Expression source) {
+ this.field = field;
+ this.source = source;
+ }
+
+ public String field() { return field; }
+ public Expression source() { return source; }
+
+ @Override
+ public Expression[] children() { return new Expression[]{ source() }; }
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
index ab9e33e86be..53c511a87f6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
@@ -346,6 +346,24 @@ import org.apache.spark.sql.internal.connector.ToStringSQLBuilder;
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
+ * <li>Name: <code>DATE_ADD</code>
+ * <ul>
+ * <li>SQL semantic: <code>DATE_ADD(start_date, num_days)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
+ * <li>Name: <code>DATE_DIFF</code>
+ * <ul>
+ * <li>SQL semantic: <code>DATE_DIFF(end_date, start_date)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
+ * <li>Name: <code>TRUNC</code>
+ * <ul>
+ * <li>SQL semantic: <code>TRUNC(date, format)</code></li>
+ * <li>Since version: 3.4.0</li>
+ * </ul>
+ * </li>
* </ol>
* Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off,
* including: add, subtract, multiply, divide, remainder, pmod.
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index 9b62fedcc80..2a011026149 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -23,6 +23,7 @@ import java.util.stream.Collectors;
import org.apache.spark.sql.connector.expressions.Cast;
import org.apache.spark.sql.connector.expressions.Expression;
+import org.apache.spark.sql.connector.expressions.Extract;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.GeneralScalarExpression;
import org.apache.spark.sql.connector.expressions.Literal;
@@ -46,6 +47,9 @@ public class V2ExpressionSQLBuilder {
} else if (expr instanceof Cast) {
Cast cast = (Cast) expr;
return visitCast(build(cast.expression()), cast.dataType());
+ } else if (expr instanceof Extract) {
+ Extract extract = (Extract) expr;
+ return visitExtract(extract.field(), build(extract.source()));
} else if (expr instanceof GeneralScalarExpression) {
GeneralScalarExpression e = (GeneralScalarExpression) expr;
String name = e.name();
@@ -136,6 +140,9 @@ public class V2ExpressionSQLBuilder {
case "UPPER":
case "LOWER":
case "TRANSLATE":
+ case "DATE_ADD":
+ case "DATE_DIFF":
+ case "TRUNC":
return visitSQLFunction(name,
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
case "CASE_WHEN": {
@@ -327,4 +334,8 @@ public class V2ExpressionSQLBuilder {
return "TRIM(" + direction + " " + inputs[1] + " FROM " + inputs[0] + ")";
}
}
+
+ protected String visitExtract(String field, String source) {
+ return "EXTRACT(" + field + " FROM " + source + ")";
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
index 163e071f08e..8bb65a88044 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
@@ -18,9 +18,9 @@
package org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc}
+import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate}
-import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.sql.types.{BooleanType, IntegerType}
/**
* The builder to generate V2 expressions from catalyst expressions.
@@ -344,6 +344,59 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
} else {
None
}
+ case date: DateAdd =>
+ val childrenExpressions = date.children.flatMap(generateExpression(_))
+ if (childrenExpressions.length == date.children.length) {
+ Some(new GeneralScalarExpression("DATE_ADD", childrenExpressions.toArray[V2Expression]))
+ } else {
+ None
+ }
+ case date: DateDiff =>
+ val childrenExpressions = date.children.flatMap(generateExpression(_))
+ if (childrenExpressions.length == date.children.length) {
+ Some(new GeneralScalarExpression("DATE_DIFF", childrenExpressions.toArray[V2Expression]))
+ } else {
+ None
+ }
+ case date: TruncDate =>
+ val childrenExpressions = date.children.flatMap(generateExpression(_))
+ if (childrenExpressions.length == date.children.length) {
+ Some(new GeneralScalarExpression("TRUNC", childrenExpressions.toArray[V2Expression]))
+ } else {
+ None
+ }
+ case Second(child, _) =>
+ generateExpression(child).map(v => new V2Extract("SECOND", v))
+ case Minute(child, _) =>
+ generateExpression(child).map(v => new V2Extract("MINUTE", v))
+ case Hour(child, _) =>
+ generateExpression(child).map(v => new V2Extract("HOUR", v))
+ case Month(child) =>
+ generateExpression(child).map(v => new V2Extract("MONTH", v))
+ case Quarter(child) =>
+ generateExpression(child).map(v => new V2Extract("QUARTER", v))
+ case Year(child) =>
+ generateExpression(child).map(v => new V2Extract("YEAR", v))
+ // DayOfWeek uses Sunday = 1, Monday = 2, ... and ISO standard is Monday = 1, ...,
+ // so we use the formula ((ISO_standard % 7) + 1) to do translation.
+ case DayOfWeek(child) =>
+ generateExpression(child).map(v => new GeneralScalarExpression("+",
+ Array[V2Expression](new GeneralScalarExpression("%",
+ Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(7, IntegerType))),
+ LiteralValue(1, IntegerType))))
+ // WeekDay uses Monday = 0, Tuesday = 1, ... and ISO standard is Monday = 1, ...,
+ // so we use the formula (ISO_standard - 1) to do translation.
+ case WeekDay(child) =>
+ generateExpression(child).map(v => new GeneralScalarExpression("-",
+ Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(1, IntegerType))))
+ case DayOfMonth(child) =>
+ generateExpression(child).map(v => new V2Extract("DAY", v))
+ case DayOfYear(child) =>
+ generateExpression(child).map(v => new V2Extract("DAY_OF_YEAR", v))
+ case WeekOfYear(child) =>
+ generateExpression(child).map(v => new V2Extract("WEEK", v))
+ case YearOfWeek(child) =>
+ generateExpression(child).map(v => new V2Extract("YEAR_OF_WEEK", v))
// TODO supports other expressions
case ApplyFunctionExpression(function, children) =>
val childrenExpressions = children.flatMap(generateExpression(_))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 1202f51ef94..f96dd5559f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -22,10 +22,12 @@ import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
+import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}
@@ -132,4 +134,28 @@ private[sql] object H2Dialect extends JdbcDialect {
}
super.classifyException(message, e)
}
+
+ override def compileExpression(expr: Expression): Option[String] = {
+ val jdbcSQLBuilder = new H2JDBCSQLBuilder()
+ try {
+ Some(jdbcSQLBuilder.build(expr))
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Error occurs while compiling V2 expression", e)
+ None
+ }
+ }
+
+ class H2JDBCSQLBuilder extends JDBCSQLBuilder {
+
+ override def visitExtract(field: String, source: String): String = {
+ val newField = field match {
+ case "DAY_OF_WEEK" => "ISO_DAY_OF_WEEK"
+ case "WEEK" => "ISO_WEEK"
+ case "YEAR_OF_WEEK" => "ISO_WEEK_YEAR"
+ case _ => field
+ }
+ s"EXTRACT($newField FROM $source)"
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 865d4718d68..4156ae5b279 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -181,6 +181,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"(1, 'bottle', 11111111111111111111.123)").executeUpdate()
conn.prepareStatement("INSERT INTO \"test\".\"item\" VALUES " +
"(1, 'bottle', 99999999999999999999.123)").executeUpdate()
+
+ conn.prepareStatement(
+ "CREATE TABLE \"test\".\"datetime\" (name TEXT(32), date1 DATE, time1 TIMESTAMP)")
+ .executeUpdate()
+ conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " +
+ "('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
+ conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " +
+ "('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
}
H2Dialect.registerFunction("my_avg", IntegralAverage)
H2Dialect.registerFunction("my_strlen", StrLen(CharLength))
@@ -199,9 +207,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}
private def checkPushedInfo(df: DataFrame, expectedPlanFragment: String*): Unit = {
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- checkKeywordsExistsInExplain(df, expectedPlanFragment: _*)
+ withSQLConf(SQLConf.MAX_METADATA_STRING_LENGTH.key -> "1000") {
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ checkKeywordsExistsInExplain(df, expectedPlanFragment: _*)
+ }
}
}
@@ -744,8 +754,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkSortRemoved(df9)
checkLimitRemoved(df9)
checkPushedInfo(df9, "PushedFilters: [], " +
- "PushedTopN: ORDER BY [CASE WHEN (SALARY > 8000.00) AND " +
- "(SALARY < 10000.00) THEN SALARY ELSE 0.00 END ASC NULL..., ")
+ "PushedTopN: " +
+ "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " +
+ "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,")
checkAnswer(df9,
Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0)))
@@ -762,8 +773,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkSortRemoved(df10, false)
checkLimitRemoved(df10, false)
checkPushedInfo(df10, "PushedFilters: [], " +
- "PushedTopN: ORDER BY [CASE WHEN (SALARY > 8000.00) AND " +
- "(SALARY < 10000.00) THEN SALARY ELSE 0.00 END ASC NULL..., ")
+ "PushedTopN: " +
+ "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " +
+ "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,")
checkAnswer(df10,
Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0)))
}
@@ -880,8 +892,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.filter(floor($"bonus") === 1200)
.filter(ceil($"bonus") === 1200)
checkFiltersRemoved(df13)
- checkPushedInfo(df13, "PushedFilters: [BONUS IS NOT NULL, LN(BONUS) > 7.0, EXP(BONUS) > 0.0, " +
- "(POWER(BONUS, 2.0)) = 1440000.0, SQRT(BONU...,")
+ checkPushedInfo(df13, "PushedFilters: " +
+ "[BONUS IS NOT NULL, LN(BONUS) > 7.0, EXP(BONUS) > 0.0, (POWER(BONUS, 2.0)) = 1440000.0, " +
+ "SQRT(BONUS) > 34.0, FLOOR(BONUS) = 1200, CEIL(BONUS) = 1200],")
checkAnswer(df13, Seq(Row(1, "cathy", 9000, 1200, false),
Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true)))
@@ -903,8 +916,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.filter(radians($"bonus") > 20)
.filter(signum($"bonus") === 1)
checkFiltersRemoved(df15)
- checkPushedInfo(df15, "PushedFilters: [BONUS IS NOT NULL, (LOG(2.0, BONUS)) > 10.0, " +
- "LOG10(BONUS) > 3.0, (ROUND(BONUS, 0)) = 1200.0, DEG...,")
+ checkPushedInfo(df15, "PushedFilters: " +
+ "[BONUS IS NOT NULL, (LOG(2.0, BONUS)) > 10.0, LOG10(BONUS) > 3.0, " +
+ "(ROUND(BONUS, 0)) = 1200.0, DEGREES(BONUS) > 68754.0, RADIANS(BONUS) > 20.0, " +
+ "SIGN(BONUS) = 1.0],")
checkAnswer(df15, Seq(Row(1, "cathy", 9000, 1200, false),
Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true)))
@@ -921,8 +936,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.filter(atan($"bonus") > 1.4)
.filter(atan2($"bonus", $"bonus") > 0.7)
checkFiltersRemoved(df16)
- checkPushedInfo(df16, "PushedFilters: [BONUS IS NOT NULL, SIN(BONUS) < -0.08, " +
- "SINH(BONUS) > 200.0, COS(BONUS) > 0.9, COSH(BONUS) > 200....,")
+ checkPushedInfo(df16, "PushedFilters: [" +
+ "BONUS IS NOT NULL, SIN(BONUS) < -0.08, SINH(BONUS) > 200.0, COS(BONUS) > 0.9, " +
+ "COSH(BONUS) > 200.0, TAN(BONUS) < -0.08, TANH(BONUS) = 1.0, COT(BONUS) < -11.0, " +
+ "ASIN(BONUS) > 0.1, ACOS(BONUS) > 1.4, ATAN(BONUS) > 1.4, (ATAN2(BONUS, BONUS)) > 0.7],")
checkAnswer(df16, Seq(Row(1, "cathy", 9000, 1200, false),
Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true)))
@@ -1025,18 +1042,92 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|AND cast(dept as short) > 1
|AND cast(bonus as decimal(20, 2)) > 1200""".stripMargin)
checkFiltersRemoved(df6, ansiMode)
- val expectedPlanFragment8 = if (ansiMode) {
+ val expectedPlanFragment6 = if (ansiMode) {
"PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL, " +
- "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, ...,"
+ "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, " +
+ "CAST(DEPT AS short) > 1, CAST(BONUS AS decimal(20,2)) > 1200.00]"
} else {
"PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL],"
}
- checkPushedInfo(df6, expectedPlanFragment8)
+ checkPushedInfo(df6, expectedPlanFragment6)
checkAnswer(df6, Seq(Row(2, "david", 10000, 1300, true)))
}
}
}
+ test("scan with filter push-down with date time functions") {
+ val df1 = sql("SELECT name FROM h2.test.datetime WHERE " +
+ "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ")
+ checkFiltersRemoved(df1)
+ val expectedPlanFragment1 =
+ "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100, " +
+ "EXTRACT(DAY FROM DATE1) > 10]"
+ checkPushedInfo(df1, expectedPlanFragment1)
+ checkAnswer(df1, Seq(Row("amy"), Row("alex")))
+
+ val df2 = sql("SELECT name FROM h2.test.datetime WHERE " +
+ "year(date1) = 2022 AND quarter(date1) = 2")
+ checkFiltersRemoved(df2)
+ val expectedPlanFragment2 =
+ "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " +
+ "EXTRACT(QUARTER FROM DATE1) = 2]"
+ checkPushedInfo(df2, expectedPlanFragment2)
+ checkAnswer(df2, Seq(Row("amy"), Row("alex")))
+
+ val df3 = sql("SELECT name FROM h2.test.datetime WHERE " +
+ "second(time1) = 0 AND month(date1) = 5")
+ checkFiltersRemoved(df3)
+ val expectedPlanFragment3 =
+ "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, " +
+ "EXTRACT(SECOND FROM TIME1) = 0, EXTRACT(MONTH FROM DATE1) = 5]"
+ checkPushedInfo(df3, expectedPlanFragment3)
+ checkAnswer(df3, Seq(Row("amy"), Row("alex")))
+
+ val df4 = sql("SELECT name FROM h2.test.datetime WHERE " +
+ "hour(time1) = 0 AND minute(time1) = 0")
+ checkFiltersRemoved(df4)
+ val expectedPlanFragment4 =
+ "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " +
+ "EXTRACT(MINUTE FROM TIME1) = 0]"
+ checkPushedInfo(df4, expectedPlanFragment4)
+ checkAnswer(df4, Seq(Row("amy"), Row("alex")))
+
+ val df5 = sql("SELECT name FROM h2.test.datetime WHERE " +
+ "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022")
+ checkFiltersRemoved(df5)
+ val expectedPlanFragment5 =
+ "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " +
+ "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]"
+ checkPushedInfo(df5, expectedPlanFragment5)
+ checkAnswer(df5, Seq(Row("alex"), Row("amy")))
+
+ // H2 does not support
+ val df6 = sql("SELECT name FROM h2.test.datetime WHERE " +
+ "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " +
+ "AND datediff(date1, '2022-05-10') > 0")
+ checkFiltersRemoved(df6, false)
+ val expectedPlanFragment6 =
+ "PushedFilters: [DATE1 IS NOT NULL]"
+ checkPushedInfo(df6, expectedPlanFragment6)
+ checkAnswer(df6, Seq(Row("amy")))
+
+ val df7 = sql("SELECT name FROM h2.test.datetime WHERE " +
+ "weekday(date1) = 2")
+ checkFiltersRemoved(df7)
+ val expectedPlanFragment7 =
+ "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2]"
+ checkPushedInfo(df7, expectedPlanFragment7)
+ checkAnswer(df7, Seq(Row("alex")))
+
+ val df8 = sql("SELECT name FROM h2.test.datetime WHERE " +
+ "dayofweek(date1) = 4")
+ checkFiltersRemoved(df8)
+ val expectedPlanFragment8 =
+ "PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]"
+ checkPushedInfo(df8, expectedPlanFragment8)
+ checkAnswer(df8, Seq(Row("alex")))
+ }
+
test("scan with filter push-down with UDF") {
JdbcDialects.unregisterDialect(H2Dialect)
try {
@@ -1115,7 +1206,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(sql("SHOW TABLES IN h2.test"),
Seq(Row("test", "people", false), Row("test", "empty_table", false),
Row("test", "employee", false), Row("test", "item", false), Row("test", "dept", false),
- Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false)))
+ Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false),
+ Row("test", "datetime", false)))
}
test("SQL API: create table as select") {
@@ -1213,7 +1305,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkFiltersRemoved(df2)
val expectedPlanFragment2 =
"PushedFilters: [NAME IS NOT NULL, TRIM(BOTH FROM NAME) = 'jen', " +
- "(TRIM(BOTH 'j' FROM NAME)) = 'en', (TRANSLATE(NA..."
+ "(TRIM(BOTH 'j' FROM NAME)) = 'en', (TRANSLATE(NAME, 'e', '1')) = 'j1n']"
checkPushedInfo(df2, expectedPlanFragment2)
checkAnswer(df2, Seq(Row(6, "jen", 12000, 1200, true)))
@@ -1803,10 +1895,20 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
""".stripMargin)
checkAggregateRemoved(df)
checkPushedInfo(df,
- "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" +
- " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " +
- "PushedFilters: [], " +
- "PushedGroupByExpressions: [DEPT], ")
+ "PushedAggregates: " +
+ "[COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END), " +
+ "COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY <= 13000.00) THEN SALARY ELSE 0.00 END), " +
+ "COUNT(CASE WHEN (SALARY > 11000.00) OR (SALARY < 10000.00) THEN SALARY ELSE 0.00 END), " +
+ "COUNT(CASE WHEN (SALARY >= 12000.00) OR (SALARY < 9000.00) THEN SALARY ELSE 0.00 END), " +
+ "MAX(CASE WHEN (SALARY <= 10000.00) AND (SALARY >= 8000.00) THEN SALARY ELSE 0.00 END), " +
+ "MAX(CASE WHEN (SALARY <= 9000.00) OR (SALARY > 10000.00) THEN SALARY ELSE 0.00 END), " +
+ "MAX(CASE WHEN (SALARY = 0.00) OR (SALARY >= 8000.00) THEN SALARY ELSE 0.00 END), " +
+ "MAX(CASE WHEN (SALARY <= 8000.00) OR (SALARY >= 10000.00) THEN 0.00 ELSE SALARY END), " +
+ "MIN(CASE WHEN (SALARY <= 8000.00) AND (SALARY IS NOT NULL) THEN SALARY ELSE 0.00 END), " +
+ "SUM(CASE WHEN SALARY > 10000.00 THEN 2 WHEN SALARY > 8000.00 THEN 1 END), " +
+ "AVG(CASE WHEN (SALARY <= 8000.00) AND (SALARY IS NULL) THEN SALARY ELSE 0.00 END)], " +
+ "PushedFilters: [], " +
+ "PushedGroupByExpressions: [DEPT],")
checkAnswer(df, Seq(Row(1, 1, 1, 1, 1, 0d, 12000d, 0d, 12000d, 0d, 0d, 2, 0d),
Row(2, 2, 2, 2, 2, 10000d, 12000d, 10000d, 12000d, 0d, 0d, 3, 0d),
Row(2, 2, 2, 2, 2, 10000d, 9000d, 10000d, 10000d, 9000d, 0d, 2, 0d)))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org