You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/08 03:34:47 UTC

[spark] branch master updated: [SPARK-38899][SQL] DS V2 supports push down datetime functions

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1df405fb122 [SPARK-38899][SQL] DS V2 supports push down datetime functions
1df405fb122 is described below

commit 1df405fb122fa492e2f499b9bb1cf3ba5ecfd060
Author: chenzhx <ch...@apache.org>
AuthorDate: Fri Jul 8 11:34:23 2022 +0800

    [SPARK-38899][SQL] DS V2 supports push down datetime functions
    
    ### What changes were proposed in this pull request?
    
    Currently, Spark have some datetime functions. Please refer
    https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L577
    
    These functions show below:
    `DATE_ADD,`
    `DATEDIFF`,
    `TRUNC`,
    `EXTRACT`,
    `SECOND`,
    `MINUTE`,
    `HOUR`,
    `MONTH`,
    `QUARTER`,
    `YEAR`,
    `DAYOFWEEK`,
    `DAYOFMONTH`,
    `DAYOFYEAR`
    
    The mainstream databases support these functions show below.
    Function|PostgreSQL|ClickHouse|H2|MySQL|Oracle|Presto|Teradata|Snowflake|DB2|Vertica|Exasol|Impala|Mariadb|Druid|Singlestore|ElasticSearch
    -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
    `DateAdd`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|No|No|Yes|Yes|No|Yes|Yes
    `DateDiff`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|Yes|No|Yes|Yes|No|Yes|Yes
    `DateTrunc`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes| Yes|Yes|Yes|Yes|No|Yes|Yes|Yes
    `Hour`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
    `Minute`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
    `Month`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
    `Quarter`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
    `Second`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
    `Year`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
    `DayOfMonth`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes
    `DayOfWeek`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes
    `DayOfYear`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes
    `WEEK_OF_YEAR`|Yes|No|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes
    `YEAR_OF_WEEK`|No|No|Yes|Yes|Yes|Yes|No|Yes|No|No|No|No|Yes|No|No|No
    
    DS V2 should supports push down these datetime functions.
    
    ### Why are the changes needed?
    
    DS V2 supports push down datetime functions.
    
    ### Does this PR introduce _any_ user-facing change?
    
    'No'.
    New feature.
    
    ### How was this patch tested?
    
    New tests.
    
    Closes #36663 from chenzhx/datetime.
    
    Authored-by: chenzhx <ch...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/connector/expressions/Extract.java   |  62 +++++++++
 .../expressions/GeneralScalarExpression.java       |  18 +++
 .../sql/connector/util/V2ExpressionSQLBuilder.java |  11 ++
 .../sql/catalyst/util/V2ExpressionBuilder.scala    |  57 +++++++-
 .../org/apache/spark/sql/jdbc/H2Dialect.scala      |  26 ++++
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    | 146 +++++++++++++++++----
 6 files changed, 296 insertions(+), 24 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java
new file mode 100644
index 00000000000..a925f1ee31a
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.expressions;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.io.Serializable;
+
+/**
+ * Represent an extract function, which extracts and returns the value of a
+ * specified datetime field from a datetime or interval value expression.
+ * <p>
+ * The currently supported fields names following the ISO standard:
+ * <ol>
+ *  <li> <code>SECOND</code> Since 3.4.0 </li>
+ *  <li> <code>MINUTE</code> Since 3.4.0 </li>
+ *  <li> <code>HOUR</code> Since 3.4.0 </li>
+ *  <li> <code>MONTH</code> Since 3.4.0 </li>
+ *  <li> <code>QUARTER</code> Since 3.4.0 </li>
+ *  <li> <code>YEAR</code> Since 3.4.0 </li>
+ *  <li> <code>DAY_OF_WEEK</code> Since 3.4.0 </li>
+ *  <li> <code>DAY</code> Since 3.4.0 </li>
+ *  <li> <code>DAY_OF_YEAR</code> Since 3.4.0 </li>
+ *  <li> <code>WEEK</code> Since 3.4.0 </li>
+ *  <li> <code>YEAR_OF_WEEK</code> Since 3.4.0 </li>
+ * </ol>
+ *
+ * @since 3.4.0
+ */
+
+@Evolving
+public class Extract implements Expression, Serializable {
+
+  private String field;
+  private Expression source;
+
+  public Extract(String field, Expression source) {
+    this.field = field;
+    this.source = source;
+  }
+
+  public String field() { return field; }
+  public Expression source() { return source; }
+
+  @Override
+  public Expression[] children() { return new Expression[]{ source() }; }
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
index ab9e33e86be..53c511a87f6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
@@ -346,6 +346,24 @@ import org.apache.spark.sql.internal.connector.ToStringSQLBuilder;
  *    <li>Since version: 3.4.0</li>
  *   </ul>
  *  </li>
+ *  <li>Name: <code>DATE_ADD</code>
+ *   <ul>
+ *    <li>SQL semantic: <code>DATE_ADD(start_date, num_days)</code></li>
+ *    <li>Since version: 3.4.0</li>
+ *   </ul>
+ *  </li>
+ *  <li>Name: <code>DATE_DIFF</code>
+ *   <ul>
+ *    <li>SQL semantic: <code>DATE_DIFF(end_date, start_date)</code></li>
+ *    <li>Since version: 3.4.0</li>
+ *   </ul>
+ *  </li>
+ *  <li>Name: <code>TRUNC</code>
+ *   <ul>
+ *    <li>SQL semantic: <code>TRUNC(date, format)</code></li>
+ *    <li>Since version: 3.4.0</li>
+ *   </ul>
+ *  </li>
  * </ol>
  * Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off,
  * including: add, subtract, multiply, divide, remainder, pmod.
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index 9b62fedcc80..2a011026149 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -23,6 +23,7 @@ import java.util.stream.Collectors;
 
 import org.apache.spark.sql.connector.expressions.Cast;
 import org.apache.spark.sql.connector.expressions.Expression;
+import org.apache.spark.sql.connector.expressions.Extract;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.GeneralScalarExpression;
 import org.apache.spark.sql.connector.expressions.Literal;
@@ -46,6 +47,9 @@ public class V2ExpressionSQLBuilder {
     } else if (expr instanceof Cast) {
       Cast cast = (Cast) expr;
       return visitCast(build(cast.expression()), cast.dataType());
+    } else if (expr instanceof Extract) {
+      Extract extract = (Extract) expr;
+      return visitExtract(extract.field(), build(extract.source()));
     } else if (expr instanceof GeneralScalarExpression) {
       GeneralScalarExpression e = (GeneralScalarExpression) expr;
       String name = e.name();
@@ -136,6 +140,9 @@ public class V2ExpressionSQLBuilder {
         case "UPPER":
         case "LOWER":
         case "TRANSLATE":
+        case "DATE_ADD":
+        case "DATE_DIFF":
+        case "TRUNC":
           return visitSQLFunction(name,
             Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
         case "CASE_WHEN": {
@@ -327,4 +334,8 @@ public class V2ExpressionSQLBuilder {
       return "TRIM(" + direction + " " + inputs[1] + " FROM " + inputs[0] + ")";
     }
   }
+
+  protected String visitExtract(String field, String source) {
+    return "EXTRACT(" + field + " FROM " + source + ")";
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
index 163e071f08e..8bb65a88044 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc}
+import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc}
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate}
-import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.sql.types.{BooleanType, IntegerType}
 
 /**
  * The builder to generate V2 expressions from catalyst expressions.
@@ -344,6 +344,59 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
       } else {
         None
       }
+    case date: DateAdd =>
+      val childrenExpressions = date.children.flatMap(generateExpression(_))
+      if (childrenExpressions.length == date.children.length) {
+        Some(new GeneralScalarExpression("DATE_ADD", childrenExpressions.toArray[V2Expression]))
+      } else {
+        None
+      }
+    case date: DateDiff =>
+      val childrenExpressions = date.children.flatMap(generateExpression(_))
+      if (childrenExpressions.length == date.children.length) {
+        Some(new GeneralScalarExpression("DATE_DIFF", childrenExpressions.toArray[V2Expression]))
+      } else {
+        None
+      }
+    case date: TruncDate =>
+      val childrenExpressions = date.children.flatMap(generateExpression(_))
+      if (childrenExpressions.length == date.children.length) {
+        Some(new GeneralScalarExpression("TRUNC", childrenExpressions.toArray[V2Expression]))
+      } else {
+        None
+      }
+    case Second(child, _) =>
+      generateExpression(child).map(v => new V2Extract("SECOND", v))
+    case Minute(child, _) =>
+      generateExpression(child).map(v => new V2Extract("MINUTE", v))
+    case Hour(child, _) =>
+      generateExpression(child).map(v => new V2Extract("HOUR", v))
+    case Month(child) =>
+      generateExpression(child).map(v => new V2Extract("MONTH", v))
+    case Quarter(child) =>
+      generateExpression(child).map(v => new V2Extract("QUARTER", v))
+    case Year(child) =>
+      generateExpression(child).map(v => new V2Extract("YEAR", v))
+    // DayOfWeek uses Sunday = 1, Monday = 2, ... and ISO standard is Monday = 1, ...,
+    // so we use the formula ((ISO_standard % 7) + 1) to do translation.
+    case DayOfWeek(child) =>
+      generateExpression(child).map(v => new GeneralScalarExpression("+",
+        Array[V2Expression](new GeneralScalarExpression("%",
+          Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(7, IntegerType))),
+          LiteralValue(1, IntegerType))))
+    // WeekDay uses Monday = 0, Tuesday = 1, ... and ISO standard is Monday = 1, ...,
+    // so we use the formula (ISO_standard - 1) to do translation.
+    case WeekDay(child) =>
+      generateExpression(child).map(v => new GeneralScalarExpression("-",
+        Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(1, IntegerType))))
+    case DayOfMonth(child) =>
+      generateExpression(child).map(v => new V2Extract("DAY", v))
+    case DayOfYear(child) =>
+      generateExpression(child).map(v => new V2Extract("DAY_OF_YEAR", v))
+    case WeekOfYear(child) =>
+      generateExpression(child).map(v => new V2Extract("WEEK", v))
+    case YearOfWeek(child) =>
+      generateExpression(child).map(v => new V2Extract("YEAR_OF_WEEK", v))
     // TODO supports other expressions
     case ApplyFunctionExpression(function, children) =>
       val childrenExpressions = children.flatMap(generateExpression(_))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 1202f51ef94..f96dd5559f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -22,10 +22,12 @@ import java.util.Locale
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
+import org.apache.spark.sql.connector.expressions.Expression
 import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc}
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}
@@ -132,4 +134,28 @@ private[sql] object H2Dialect extends JdbcDialect {
     }
     super.classifyException(message, e)
   }
+
+  override def compileExpression(expr: Expression): Option[String] = {
+    val jdbcSQLBuilder = new H2JDBCSQLBuilder()
+    try {
+      Some(jdbcSQLBuilder.build(expr))
+    } catch {
+      case NonFatal(e) =>
+        logWarning("Error occurs while compiling V2 expression", e)
+        None
+    }
+  }
+
+  class H2JDBCSQLBuilder extends JDBCSQLBuilder {
+
+    override def visitExtract(field: String, source: String): String = {
+      val newField = field match {
+        case "DAY_OF_WEEK" => "ISO_DAY_OF_WEEK"
+        case "WEEK" => "ISO_WEEK"
+        case "YEAR_OF_WEEK" => "ISO_WEEK_YEAR"
+        case _ => field
+      }
+      s"EXTRACT($newField FROM $source)"
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 865d4718d68..4156ae5b279 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -181,6 +181,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
         "(1, 'bottle', 11111111111111111111.123)").executeUpdate()
       conn.prepareStatement("INSERT INTO \"test\".\"item\" VALUES " +
         "(1, 'bottle', 99999999999999999999.123)").executeUpdate()
+
+      conn.prepareStatement(
+        "CREATE TABLE \"test\".\"datetime\" (name TEXT(32), date1 DATE, time1 TIMESTAMP)")
+        .executeUpdate()
+      conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " +
+        "('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
+      conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " +
+        "('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
     }
     H2Dialect.registerFunction("my_avg", IntegralAverage)
     H2Dialect.registerFunction("my_strlen", StrLen(CharLength))
@@ -199,9 +207,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
   }
 
   private def checkPushedInfo(df: DataFrame, expectedPlanFragment: String*): Unit = {
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        checkKeywordsExistsInExplain(df, expectedPlanFragment: _*)
+    withSQLConf(SQLConf.MAX_METADATA_STRING_LENGTH.key -> "1000") {
+      df.queryExecution.optimizedPlan.collect {
+        case _: DataSourceV2ScanRelation =>
+          checkKeywordsExistsInExplain(df, expectedPlanFragment: _*)
+      }
     }
   }
 
@@ -744,8 +754,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkSortRemoved(df9)
     checkLimitRemoved(df9)
     checkPushedInfo(df9, "PushedFilters: [], " +
-      "PushedTopN: ORDER BY [CASE WHEN (SALARY > 8000.00) AND " +
-      "(SALARY < 10000.00) THEN SALARY ELSE 0.00 END ASC NULL..., ")
+      "PushedTopN: " +
+      "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " +
+      "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,")
     checkAnswer(df9,
       Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0)))
 
@@ -762,8 +773,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkSortRemoved(df10, false)
     checkLimitRemoved(df10, false)
     checkPushedInfo(df10, "PushedFilters: [], " +
-      "PushedTopN: ORDER BY [CASE WHEN (SALARY > 8000.00) AND " +
-      "(SALARY < 10000.00) THEN SALARY ELSE 0.00 END ASC NULL..., ")
+      "PushedTopN: " +
+      "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " +
+      "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,")
     checkAnswer(df10,
       Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0)))
   }
@@ -880,8 +892,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .filter(floor($"bonus") === 1200)
       .filter(ceil($"bonus") === 1200)
     checkFiltersRemoved(df13)
-    checkPushedInfo(df13, "PushedFilters: [BONUS IS NOT NULL, LN(BONUS) > 7.0, EXP(BONUS) > 0.0, " +
-      "(POWER(BONUS, 2.0)) = 1440000.0, SQRT(BONU...,")
+    checkPushedInfo(df13, "PushedFilters: " +
+      "[BONUS IS NOT NULL, LN(BONUS) > 7.0, EXP(BONUS) > 0.0, (POWER(BONUS, 2.0)) = 1440000.0, " +
+      "SQRT(BONUS) > 34.0, FLOOR(BONUS) = 1200, CEIL(BONUS) = 1200],")
     checkAnswer(df13, Seq(Row(1, "cathy", 9000, 1200, false),
       Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true)))
 
@@ -903,8 +916,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .filter(radians($"bonus") > 20)
       .filter(signum($"bonus") === 1)
     checkFiltersRemoved(df15)
-    checkPushedInfo(df15, "PushedFilters: [BONUS IS NOT NULL, (LOG(2.0, BONUS)) > 10.0, " +
-      "LOG10(BONUS) > 3.0, (ROUND(BONUS, 0)) = 1200.0, DEG...,")
+    checkPushedInfo(df15, "PushedFilters: " +
+      "[BONUS IS NOT NULL, (LOG(2.0, BONUS)) > 10.0, LOG10(BONUS) > 3.0, " +
+      "(ROUND(BONUS, 0)) = 1200.0, DEGREES(BONUS) > 68754.0, RADIANS(BONUS) > 20.0, " +
+      "SIGN(BONUS) = 1.0],")
     checkAnswer(df15, Seq(Row(1, "cathy", 9000, 1200, false),
       Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true)))
 
@@ -921,8 +936,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .filter(atan($"bonus") > 1.4)
       .filter(atan2($"bonus", $"bonus") > 0.7)
     checkFiltersRemoved(df16)
-    checkPushedInfo(df16, "PushedFilters: [BONUS IS NOT NULL, SIN(BONUS) < -0.08, " +
-      "SINH(BONUS) > 200.0, COS(BONUS) > 0.9, COSH(BONUS) > 200....,")
+    checkPushedInfo(df16, "PushedFilters: [" +
+      "BONUS IS NOT NULL, SIN(BONUS) < -0.08, SINH(BONUS) > 200.0, COS(BONUS) > 0.9, " +
+      "COSH(BONUS) > 200.0, TAN(BONUS) < -0.08, TANH(BONUS) = 1.0, COT(BONUS) < -11.0, " +
+      "ASIN(BONUS) > 0.1, ACOS(BONUS) > 1.4, ATAN(BONUS) > 1.4, (ATAN2(BONUS, BONUS)) > 0.7],")
     checkAnswer(df16, Seq(Row(1, "cathy", 9000, 1200, false),
       Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true)))
 
@@ -1025,18 +1042,92 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
             |AND cast(dept as short) > 1
             |AND cast(bonus as decimal(20, 2)) > 1200""".stripMargin)
         checkFiltersRemoved(df6, ansiMode)
-        val expectedPlanFragment8 = if (ansiMode) {
+        val expectedPlanFragment6 = if (ansiMode) {
           "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL, " +
-            "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, ...,"
+            "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, " +
+            "CAST(DEPT AS short) > 1, CAST(BONUS AS decimal(20,2)) > 1200.00]"
         } else {
           "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL],"
         }
-        checkPushedInfo(df6, expectedPlanFragment8)
+        checkPushedInfo(df6, expectedPlanFragment6)
         checkAnswer(df6, Seq(Row(2, "david", 10000, 1300, true)))
       }
     }
   }
 
+  test("scan with filter push-down with date time functions") {
+    val df1 = sql("SELECT name FROM h2.test.datetime WHERE " +
+      "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ")
+    checkFiltersRemoved(df1)
+    val expectedPlanFragment1 =
+      "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100, " +
+        "EXTRACT(DAY FROM DATE1) > 10]"
+    checkPushedInfo(df1, expectedPlanFragment1)
+    checkAnswer(df1, Seq(Row("amy"), Row("alex")))
+
+    val df2 = sql("SELECT name FROM h2.test.datetime WHERE " +
+      "year(date1) = 2022 AND quarter(date1) = 2")
+    checkFiltersRemoved(df2)
+    val expectedPlanFragment2 =
+      "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " +
+        "EXTRACT(QUARTER FROM DATE1) = 2]"
+    checkPushedInfo(df2, expectedPlanFragment2)
+    checkAnswer(df2, Seq(Row("amy"), Row("alex")))
+
+    val df3 = sql("SELECT name FROM h2.test.datetime WHERE " +
+      "second(time1) = 0 AND month(date1) = 5")
+    checkFiltersRemoved(df3)
+    val expectedPlanFragment3 =
+      "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, " +
+        "EXTRACT(SECOND FROM TIME1) = 0, EXTRACT(MONTH FROM DATE1) = 5]"
+    checkPushedInfo(df3, expectedPlanFragment3)
+    checkAnswer(df3, Seq(Row("amy"), Row("alex")))
+
+    val df4 = sql("SELECT name FROM h2.test.datetime WHERE " +
+      "hour(time1) = 0 AND minute(time1) = 0")
+    checkFiltersRemoved(df4)
+    val expectedPlanFragment4 =
+      "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " +
+        "EXTRACT(MINUTE FROM TIME1) = 0]"
+    checkPushedInfo(df4, expectedPlanFragment4)
+    checkAnswer(df4, Seq(Row("amy"), Row("alex")))
+
+    val df5 = sql("SELECT name FROM h2.test.datetime WHERE " +
+      "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022")
+    checkFiltersRemoved(df5)
+    val expectedPlanFragment5 =
+      "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " +
+        "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]"
+    checkPushedInfo(df5, expectedPlanFragment5)
+    checkAnswer(df5, Seq(Row("alex"), Row("amy")))
+
+    // H2 does not support
+    val df6 = sql("SELECT name FROM h2.test.datetime WHERE " +
+      "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " +
+      "AND datediff(date1, '2022-05-10') > 0")
+    checkFiltersRemoved(df6, false)
+    val expectedPlanFragment6 =
+      "PushedFilters: [DATE1 IS NOT NULL]"
+    checkPushedInfo(df6, expectedPlanFragment6)
+    checkAnswer(df6, Seq(Row("amy")))
+
+    val df7 = sql("SELECT name FROM h2.test.datetime WHERE " +
+      "weekday(date1) = 2")
+    checkFiltersRemoved(df7)
+    val expectedPlanFragment7 =
+      "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2]"
+    checkPushedInfo(df7, expectedPlanFragment7)
+    checkAnswer(df7, Seq(Row("alex")))
+
+    val df8 = sql("SELECT name FROM h2.test.datetime WHERE " +
+      "dayofweek(date1) = 4")
+    checkFiltersRemoved(df8)
+    val expectedPlanFragment8 =
+      "PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]"
+    checkPushedInfo(df8, expectedPlanFragment8)
+    checkAnswer(df8, Seq(Row("alex")))
+  }
+
   test("scan with filter push-down with UDF") {
     JdbcDialects.unregisterDialect(H2Dialect)
     try {
@@ -1115,7 +1206,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkAnswer(sql("SHOW TABLES IN h2.test"),
       Seq(Row("test", "people", false), Row("test", "empty_table", false),
         Row("test", "employee", false), Row("test", "item", false), Row("test", "dept", false),
-        Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false)))
+        Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false),
+        Row("test", "datetime", false)))
   }
 
   test("SQL API: create table as select") {
@@ -1213,7 +1305,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkFiltersRemoved(df2)
     val expectedPlanFragment2 =
       "PushedFilters: [NAME IS NOT NULL, TRIM(BOTH FROM NAME) = 'jen', " +
-      "(TRIM(BOTH 'j' FROM NAME)) = 'en', (TRANSLATE(NA..."
+      "(TRIM(BOTH 'j' FROM NAME)) = 'en', (TRANSLATE(NAME, 'e', '1')) = 'j1n']"
     checkPushedInfo(df2, expectedPlanFragment2)
     checkAnswer(df2, Seq(Row(6, "jen", 12000, 1200, true)))
 
@@ -1803,10 +1895,20 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       """.stripMargin)
     checkAggregateRemoved(df)
     checkPushedInfo(df,
-      "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" +
-      " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " +
-      "PushedFilters: [], " +
-      "PushedGroupByExpressions: [DEPT], ")
+      "PushedAggregates: " +
+        "[COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END), " +
+        "COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY <= 13000.00) THEN SALARY ELSE 0.00 END), " +
+        "COUNT(CASE WHEN (SALARY > 11000.00) OR (SALARY < 10000.00) THEN SALARY ELSE 0.00 END), " +
+        "COUNT(CASE WHEN (SALARY >= 12000.00) OR (SALARY < 9000.00) THEN SALARY ELSE 0.00 END), " +
+        "MAX(CASE WHEN (SALARY <= 10000.00) AND (SALARY >= 8000.00) THEN SALARY ELSE 0.00 END), " +
+        "MAX(CASE WHEN (SALARY <= 9000.00) OR (SALARY > 10000.00) THEN SALARY ELSE 0.00 END), " +
+        "MAX(CASE WHEN (SALARY = 0.00) OR (SALARY >= 8000.00) THEN SALARY ELSE 0.00 END), " +
+        "MAX(CASE WHEN (SALARY <= 8000.00) OR (SALARY >= 10000.00) THEN 0.00 ELSE SALARY END), " +
+        "MIN(CASE WHEN (SALARY <= 8000.00) AND (SALARY IS NOT NULL) THEN SALARY ELSE 0.00 END), " +
+        "SUM(CASE WHEN SALARY > 10000.00 THEN 2 WHEN SALARY > 8000.00 THEN 1 END), " +
+        "AVG(CASE WHEN (SALARY <= 8000.00) AND (SALARY IS NULL) THEN SALARY ELSE 0.00 END)], " +
+        "PushedFilters: [], " +
+        "PushedGroupByExpressions: [DEPT],")
     checkAnswer(df, Seq(Row(1, 1, 1, 1, 1, 0d, 12000d, 0d, 12000d, 0d, 0d, 2, 0d),
       Row(2, 2, 2, 2, 2, 10000d, 12000d, 10000d, 12000d, 0d, 0d, 3, 0d),
       Row(2, 2, 2, 2, 2, 10000d, 9000d, 10000d, 10000d, 9000d, 0d, 2, 0d)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org