You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/14 07:40:08 UTC

[spark] branch branch-3.2 updated: [SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1686cff9 [SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function
1686cff9 is described below

commit 1686cff9a15311b1ab909996186871e0b56aab91
Author: gengjiaan <ge...@360.cn>
AuthorDate: Wed Jul 14 15:38:46 2021 +0800

    [SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function
    
    ### What changes were proposed in this pull request?
    `LOCALTIMESTAMP()` is a datetime value function from ANSI SQL.
    The syntax show below:
    ```
    <datetime value function> ::=
        <current date value function>
      | <current time value function>
      | <current timestamp value function>
      | <current local time value function>
      | <current local timestamp value function>
    <current date value function> ::=
    CURRENT_DATE
    <current time value function> ::=
    CURRENT_TIME [ <left paren> <time precision> <right paren> ]
    <current local time value function> ::=
    LOCALTIME [ <left paren> <time precision> <right paren> ]
    <current timestamp value function> ::=
    CURRENT_TIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
    <current local timestamp value function> ::=
    LOCALTIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
    ```
    
    `LOCALTIMESTAMP()` returns the current timestamp at the start of query evaluation as TIMESTAMP WITH OUT TIME ZONE. This is similar to `CURRENT_TIMESTAMP()`.
    Note we need to update the optimization rule `ComputeCurrentTime` so that Spark returns the same result in a single query if the function is called multiple times.
    
    ### Why are the changes needed?
    `CURRENT_TIMESTAMP()` returns the current timestamp at the start of query evaluation.
    `LOCALTIMESTAMP()` returns the current timestamp without time zone at the start of query evaluation.
    The `LOCALTIMESTAMP` function is an ANSI SQL.
    The `LOCALTIMESTAMP` function is very useful.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'. Support new function `LOCALTIMESTAMP()`.
    
    ### How was this patch tested?
    New tests.
    
    Closes #33258 from beliefer/SPARK-36037.
    
    Lead-authored-by: gengjiaan <ge...@360.cn>
    Co-authored-by: Jiaan Geng <be...@163.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit b4f7758944505c7c957e2e0c2c70da5ea746099b)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../analysis/UnsupportedOperationChecker.scala     |   4 +-
 .../catalyst/expressions/datetimeExpressions.scala |  42 ++++++++-
 .../sql/catalyst/optimizer/finishAnalysis.scala    |  10 +-
 .../expressions/DateExpressionsSuite.scala         |   9 ++
 .../optimizer/ComputeCurrentTimeSuite.scala        |  25 ++++-
 .../execution/streaming/MicroBatchExecution.scala  |   5 +-
 .../streaming/continuous/ContinuousExecution.scala |   8 +-
 .../scala/org/apache/spark/sql/functions.scala     |  10 ++
 .../sql-functions/sql-expression-schema.md         |   3 +-
 .../test/resources/sql-tests/inputs/datetime.sql   |   1 +
 .../sql-tests/results/ansi/datetime.sql.out        |  10 +-
 .../sql-tests/results/datetime-legacy.sql.out      |  10 +-
 .../resources/sql-tests/results/datetime.sql.out   |  10 +-
 .../results/timestampNTZ/datetime.sql.out          |  10 +-
 .../sql/expressions/ExpressionInfoSuite.scala      |   1 +
 .../sql/streaming/StreamingAggregationSuite.scala  | 102 +++++++++++----------
 17 files changed, 196 insertions(+), 65 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index d518bf3..60ca1e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -519,6 +519,7 @@ object FunctionRegistry {
     expression[CurrentDate]("current_date"),
     expression[CurrentTimestamp]("current_timestamp"),
     expression[CurrentTimeZone]("current_timezone"),
+    expression[LocalTimestamp]("localtimestamp"),
     expression[DateDiff]("datediff"),
     expression[DateAdd]("date_add"),
     expression[DateFormatClass]("date_format"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 8629300..13c7f75 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestamp, GroupingSets, MonotonicallyIncreasingID, Now}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestampLike, GroupingSets, LocalTimestamp, MonotonicallyIncreasingID}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -417,7 +417,7 @@ object UnsupportedOperationChecker extends Logging {
 
       subPlan.expressions.foreach { e =>
         if (e.collectLeaves().exists {
-          case (_: CurrentTimestamp | _: Now | _: CurrentDate) => true
+          case (_: CurrentTimestampLike | _: CurrentDate | _: LocalTimestamp) => true
           case _ => false
         }) {
           throwError(s"Continuous processing does not support current time operations.")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index ca8dea8..1146ba7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import java.text.ParseException
-import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId}
+import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId, ZoneOffset}
 import java.time.format.DateTimeParseException
 import java.util.Locale
 
@@ -201,6 +201,44 @@ case class Now() extends CurrentTimestampLike {
 }
 
 /**
+ * Returns the current timestamp without time zone at the start of query evaluation.
+ * There is no code generation since this expression should get constant folded by the optimizer.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_() - Returns the current timestamp without time zone at the start of query evaluation. All calls of localtimestamp within the same query return the same value.
+
+    _FUNC_ - Returns the current local date-time at the session time zone at the start of query evaluation.
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_();
+       2020-04-25 15:49:11.914
+  """,
+  group = "datetime_funcs",
+  since = "3.2.0")
+case class LocalTimestamp(timeZoneId: Option[String] = None) extends LeafExpression
+  with TimeZoneAwareExpression with CodegenFallback {
+
+  def this() = this(None)
+
+  override def foldable: Boolean = true
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = TimestampNTZType
+
+  final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE)
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+
+  override def eval(input: InternalRow): Any = localDateTimeToMicros(LocalDateTime.now(zoneId))
+
+  override def prettyName: String = "localtimestamp"
+}
+
+/**
  * Expression representing the current batch time, which is used by StreamExecution to
  * 1. prevent optimizer from pushing this expression below a stateful operator
  * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp)
@@ -236,6 +274,8 @@ case class CurrentBatchTimestamp(
     val timestampUs = millisToMicros(timestampMs)
     dataType match {
       case _: TimestampType => Literal(timestampUs, TimestampType)
+      case _: TimestampNTZType =>
+        Literal(convertTz(timestampUs, ZoneOffset.UTC, zoneId), TimestampNTZType)
       case _: DateType => Literal(microsToDays(timestampUs, zoneId), DateType)
     }
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 116ccde..deacc3b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -81,16 +80,19 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
     val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
     val currentTime = Literal.create(timestamp, timeExpr.dataType)
     val timezone = Literal.create(conf.sessionLocalTimeZone, StringType)
+    val localTimestamps = mutable.Map.empty[String, Literal]
 
     plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
       case currentDate @ CurrentDate(Some(timeZoneId)) =>
         currentDates.getOrElseUpdate(timeZoneId, {
-          Literal.create(
-            DateTimeUtils.microsToDays(timestamp, currentDate.zoneId),
-            DateType)
+          Literal.create(currentDate.eval().asInstanceOf[Int], DateType)
         })
       case CurrentTimestamp() | Now() => currentTime
       case CurrentTimeZone() => timezone
+      case localTimestamp @ LocalTimestamp(Some(timeZoneId)) =>
+        localTimestamps.getOrElseUpdate(timeZoneId, {
+          Literal.create(localTimestamp.eval().asInstanceOf[Long], TimestampNTZType)
+        })
     }
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index f288d82..93fc775 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -96,6 +96,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     assert(math.abs(t1 - ct.getTime) < 5000)
   }
 
+  test("datetime function localtimestamp") {
+    outstandingTimezonesIds.foreach { zid =>
+      val ct = LocalTimestamp(Some(zid)).eval(EmptyRow).asInstanceOf[Long]
+      val t1 = DateTimeUtils.localDateTimeToMicros(
+        LocalDateTime.now(DateTimeUtils.getZoneId(zid)))
+      assert(math.abs(t1 - ct) < 5000)
+    }
+  }
+
   test("DayOfYear") {
     val sdfDay = new SimpleDateFormat("D", Locale.US)
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
index 82d6757..9b04dcd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import java.time.ZoneId
+import java.time.{LocalDateTime, ZoneId}
 
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal, LocalTimestamp}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -81,4 +81,25 @@ class ComputeCurrentTimeSuite extends PlanTest {
     assert(lits.size == 1)
     assert(lits.head == SQLConf.get.sessionLocalTimeZone)
   }
+
+  test("analyzer should replace localtimestamp with literals") {
+    val in = Project(Seq(Alias(LocalTimestamp(), "a")(), Alias(LocalTimestamp(), "b")()),
+      LocalRelation())
+
+    val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
+
+    val min = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId))
+    val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
+    val max = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId))
+
+    val lits = new scala.collection.mutable.ArrayBuffer[Long]
+    plan.transformAllExpressions { case e: Literal =>
+      lits += e.value.asInstanceOf[Long]
+      e
+    }
+    assert(lits.size == 2)
+    assert(lits(0) >= min && lits(0) <= max)
+    assert(lits(1) >= min && lits(1) <= max)
+    assert(lits(0) == lits(1))
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 1ffcaf5..c31307f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.{Map => MutableMap}
 
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, LocalTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
 import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
@@ -558,6 +558,9 @@ class MicroBatchExecution(
         // dummy string to prevent UnresolvedException and to prevent to be used in the future.
         CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
           ct.dataType, Some("Dummy TimeZoneId"))
+      case lt: LocalTimestamp =>
+        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
+          lt.dataType, lt.timeZoneId)
       case cd: CurrentDate =>
         CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
           cd.dataType, cd.timeZoneId)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 7c52b80..5e40860 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.{Map => MutableMap}
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestampLike, LocalTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
 import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
@@ -172,9 +172,9 @@ class ContinuousExecution(
     }
 
     withNewSources.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
-      case (_: CurrentTimestamp | _: CurrentDate) =>
-        throw new IllegalStateException(
-          "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
+      case (_: CurrentTimestampLike | _: CurrentDate | _: LocalTimestamp) =>
+        throw new IllegalStateException("CurrentTimestamp, Now, CurrentDate and LocalTimestamp" +
+          " not yet supported for continuous processing")
     }
 
     reportTimeTaken("queryPlanning") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 77e4a67..3b39d97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2975,6 +2975,16 @@ object functions {
   def current_timestamp(): Column = withExpr { CurrentTimestamp() }
 
   /**
+   * Returns the current timestamp without time zone at the start of query evaluation
+   * as a timestamp without time zone column.
+   * All calls of localtimestamp within the same query return the same value.
+   *
+   * @group datetime_funcs
+   * @since 3.2.0
+   */
+  def localtimestamp(): Column = withExpr { LocalTimestamp() }
+
+  /**
    * Converts a date/timestamp/string to a value of string in the format specified by the date
    * format given by the second argument.
    *
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index f71f3a8..c13a1d4 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -1,6 +1,6 @@
 <!-- Automatically generated by ExpressionsSchemaSuite -->
 ## Summary
-  - Number of queries: 359
+  - Number of queries: 360
   - Number of expressions that missing example: 13
   - Expressions missing examples: bigint,binary,boolean,date,decimal,double,float,int,smallint,string,timestamp,tinyint,window
 ## Schema of Built-in Functions
@@ -162,6 +162,7 @@
 | org.apache.spark.sql.catalyst.expressions.LessThanOrEqual | <= | SELECT 2 <= 2 | struct<(2 <= 2):boolean> |
 | org.apache.spark.sql.catalyst.expressions.Levenshtein | levenshtein | SELECT levenshtein('kitten', 'sitting') | struct<levenshtein(kitten, sitting):int> |
 | org.apache.spark.sql.catalyst.expressions.Like | like | SELECT like('Spark', '_park') | struct<Spark LIKE _park:boolean> |
+| org.apache.spark.sql.catalyst.expressions.LocalTimestamp | localtimestamp | SELECT localtimestamp() | struct<localtimestamp():timestamp_ntz> |
 | org.apache.spark.sql.catalyst.expressions.Log | ln | SELECT ln(1) | struct<ln(1):double> |
 | org.apache.spark.sql.catalyst.expressions.Log10 | log10 | SELECT log10(10) | struct<LOG10(10):double> |
 | org.apache.spark.sql.catalyst.expressions.Log1p | log1p | SELECT log1p(0) | struct<LOG1P(0):double> |
diff --git a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql
index db30c22..74a451e 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql
@@ -24,6 +24,7 @@ select DATE_FROM_UNIX_DATE(0), DATE_FROM_UNIX_DATE(1000), DATE_FROM_UNIX_DATE(nu
 select UNIX_DATE(DATE('1970-01-01')), UNIX_DATE(DATE('2020-12-04')), UNIX_DATE(null);
 -- [SPARK-16836] current_date and current_timestamp literals
 select current_date = current_date(), current_timestamp = current_timestamp();
+select localtimestamp() = localtimestamp();
 
 select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd');
 
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out
index beff665..4e999f3 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 205
+-- Number of queries: 206
 
 
 -- !query
@@ -142,6 +142,14 @@ select current_date = current_date(), current_timestamp = current_timestamp()
 
 
 -- !query
+select localtimestamp() = localtimestamp()
+-- !query schema
+struct<(localtimestamp() = localtimestamp()):boolean>
+-- !query output
+true
+
+
+-- !query
 select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
 -- !query schema
 struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date>
diff --git a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out
index b6fe551..2108808 100644
--- a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 205
+-- Number of queries: 206
 
 
 -- !query
@@ -136,6 +136,14 @@ true	true
 
 
 -- !query
+select localtimestamp() = localtimestamp()
+-- !query schema
+struct<(localtimestamp() = localtimestamp()):boolean>
+-- !query output
+true
+
+
+-- !query
 select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
 -- !query schema
 struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date>
diff --git a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out
index c865f2c..f6278f6 100755
--- a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 205
+-- Number of queries: 206
 
 
 -- !query
@@ -136,6 +136,14 @@ true	true
 
 
 -- !query
+select localtimestamp() = localtimestamp()
+-- !query schema
+struct<(localtimestamp() = localtimestamp()):boolean>
+-- !query output
+true
+
+
+-- !query
 select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
 -- !query schema
 struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date>
diff --git a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out
index a5659e1..69c4b8f 100644
--- a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 205
+-- Number of queries: 206
 
 
 -- !query
@@ -136,6 +136,14 @@ true	true
 
 
 -- !query
+select localtimestamp() = localtimestamp()
+-- !query schema
+struct<(localtimestamp() = localtimestamp()):boolean>
+-- !query output
+true
+
+
+-- !query
 select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
 -- !query schema
 struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index 589ac51..30ee97a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -175,6 +175,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
       "org.apache.spark.sql.catalyst.expressions.CurrentTimestamp",
       "org.apache.spark.sql.catalyst.expressions.CurrentTimeZone",
       "org.apache.spark.sql.catalyst.expressions.Now",
+      "org.apache.spark.sql.catalyst.expressions.LocalTimestamp",
       // Random output without a seed
       "org.apache.spark.sql.catalyst.expressions.Rand",
       "org.apache.spark.sql.catalyst.expressions.Randn",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index eef13ca..77334ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode._
 import org.apache.spark.sql.streaming.util.{MockSourceProvider, StreamManualClock}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructType, TimestampType}
 import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
 import org.apache.spark.util.Utils
 
@@ -406,56 +406,66 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
     )
   }
 
-  testWithAllStateVersions("prune results by current_time, complete mode") {
+  testWithAllStateVersions("prune results by current_time or localtimestamp, complete mode") {
     import testImplicits._
-    val clock = new StreamManualClock
-    val inputData = MemoryStream[Long]
-    val aggregated =
-      inputData.toDF()
+    val inputDataOne = MemoryStream[Long]
+    val aggregatedOne =
+      inputDataOne.toDF()
         .groupBy($"value")
         .agg(count("*"))
         .where('value >= current_timestamp().cast("long") - 10L)
+    val inputDataTwo = MemoryStream[Long]
+    val aggregatedTwo =
+      inputDataTwo.toDF()
+        .groupBy($"value")
+        .agg(count("*"))
+        .where('value >= localtimestamp().cast(TimestampType).cast("long") - 10L)
+
+    Seq((inputDataOne, aggregatedOne), (inputDataTwo, aggregatedTwo)).foreach { x =>
+      val inputData = x._1
+      val aggregated = x._2
+      val clock = new StreamManualClock
+      testStream(aggregated, Complete)(
+        StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
+
+        // advance clock to 10 seconds, all keys retained
+        AddData(inputData, 0L, 5L, 5L, 10L),
+        AdvanceManualClock(10 * 1000),
+        CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+
+        // advance clock to 20 seconds, should retain keys >= 10
+        AddData(inputData, 15L, 15L, 20L),
+        AdvanceManualClock(10 * 1000),
+        CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+
+        // advance clock to 30 seconds, should retain keys >= 20
+        AddData(inputData, 0L, 85L),
+        AdvanceManualClock(10 * 1000),
+        CheckLastBatch((20L, 1), (85L, 1)),
+
+        // bounce stream and ensure correct batch timestamp is used
+        // i.e., we don't take it from the clock, which is at 90 seconds.
+        StopStream,
+        AssertOnQuery { q => // clear the sink
+          q.sink.asInstanceOf[MemorySink].clear()
+          q.commitLog.purge(3)
+          // advance by a minute i.e., 90 seconds total
+          clock.advance(60 * 1000L)
+          true
+        },
+        StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
+        // The commit log blown, causing the last batch to re-run
+        CheckLastBatch((20L, 1), (85L, 1)),
+        AssertOnQuery { q =>
+          clock.getTimeMillis() == 90000L
+        },
 
-    testStream(aggregated, Complete)(
-      StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
-
-      // advance clock to 10 seconds, all keys retained
-      AddData(inputData, 0L, 5L, 5L, 10L),
-      AdvanceManualClock(10 * 1000),
-      CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
-
-      // advance clock to 20 seconds, should retain keys >= 10
-      AddData(inputData, 15L, 15L, 20L),
-      AdvanceManualClock(10 * 1000),
-      CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
-
-      // advance clock to 30 seconds, should retain keys >= 20
-      AddData(inputData, 0L, 85L),
-      AdvanceManualClock(10 * 1000),
-      CheckLastBatch((20L, 1), (85L, 1)),
-
-      // bounce stream and ensure correct batch timestamp is used
-      // i.e., we don't take it from the clock, which is at 90 seconds.
-      StopStream,
-      AssertOnQuery { q => // clear the sink
-        q.sink.asInstanceOf[MemorySink].clear()
-        q.commitLog.purge(3)
-        // advance by a minute i.e., 90 seconds total
-        clock.advance(60 * 1000L)
-        true
-      },
-      StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
-      // The commit log blown, causing the last batch to re-run
-      CheckLastBatch((20L, 1), (85L, 1)),
-      AssertOnQuery { q =>
-        clock.getTimeMillis() == 90000L
-      },
-
-      // advance clock to 100 seconds, should retain keys >= 90
-      AddData(inputData, 85L, 90L, 100L, 105L),
-      AdvanceManualClock(10 * 1000),
-      CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
-    )
+        // advance clock to 100 seconds, should retain keys >= 90
+        AddData(inputData, 85L, 90L, 100L, 105L),
+        AdvanceManualClock(10 * 1000),
+        CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
+      )
+    }
   }
 
   testWithAllStateVersions("prune results by current_date, complete mode") {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org