You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/14 07:40:08 UTC
[spark] branch branch-3.2 updated: [SPARK-36037][SQL] Support ANSI
SQL LOCALTIMESTAMP datetime value function
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 1686cff9 [SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function
1686cff9 is described below
commit 1686cff9a15311b1ab909996186871e0b56aab91
Author: gengjiaan <ge...@360.cn>
AuthorDate: Wed Jul 14 15:38:46 2021 +0800
[SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function
### What changes were proposed in this pull request?
`LOCALTIMESTAMP()` is a datetime value function from ANSI SQL.
The syntax show below:
```
<datetime value function> ::=
<current date value function>
| <current time value function>
| <current timestamp value function>
| <current local time value function>
| <current local timestamp value function>
<current date value function> ::=
CURRENT_DATE
<current time value function> ::=
CURRENT_TIME [ <left paren> <time precision> <right paren> ]
<current local time value function> ::=
LOCALTIME [ <left paren> <time precision> <right paren> ]
<current timestamp value function> ::=
CURRENT_TIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
<current local timestamp value function> ::=
LOCALTIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
```
`LOCALTIMESTAMP()` returns the current timestamp at the start of query evaluation as TIMESTAMP WITH OUT TIME ZONE. This is similar to `CURRENT_TIMESTAMP()`.
Note we need to update the optimization rule `ComputeCurrentTime` so that Spark returns the same result in a single query if the function is called multiple times.
### Why are the changes needed?
`CURRENT_TIMESTAMP()` returns the current timestamp at the start of query evaluation.
`LOCALTIMESTAMP()` returns the current timestamp without time zone at the start of query evaluation.
The `LOCALTIMESTAMP` function is an ANSI SQL.
The `LOCALTIMESTAMP` function is very useful.
### Does this PR introduce _any_ user-facing change?
'Yes'. Support new function `LOCALTIMESTAMP()`.
### How was this patch tested?
New tests.
Closes #33258 from beliefer/SPARK-36037.
Lead-authored-by: gengjiaan <ge...@360.cn>
Co-authored-by: Jiaan Geng <be...@163.com>
Co-authored-by: Wenchen Fan <cl...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit b4f7758944505c7c957e2e0c2c70da5ea746099b)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/analysis/FunctionRegistry.scala | 1 +
.../analysis/UnsupportedOperationChecker.scala | 4 +-
.../catalyst/expressions/datetimeExpressions.scala | 42 ++++++++-
.../sql/catalyst/optimizer/finishAnalysis.scala | 10 +-
.../expressions/DateExpressionsSuite.scala | 9 ++
.../optimizer/ComputeCurrentTimeSuite.scala | 25 ++++-
.../execution/streaming/MicroBatchExecution.scala | 5 +-
.../streaming/continuous/ContinuousExecution.scala | 8 +-
.../scala/org/apache/spark/sql/functions.scala | 10 ++
.../sql-functions/sql-expression-schema.md | 3 +-
.../test/resources/sql-tests/inputs/datetime.sql | 1 +
.../sql-tests/results/ansi/datetime.sql.out | 10 +-
.../sql-tests/results/datetime-legacy.sql.out | 10 +-
.../resources/sql-tests/results/datetime.sql.out | 10 +-
.../results/timestampNTZ/datetime.sql.out | 10 +-
.../sql/expressions/ExpressionInfoSuite.scala | 1 +
.../sql/streaming/StreamingAggregationSuite.scala | 102 +++++++++++----------
17 files changed, 196 insertions(+), 65 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index d518bf3..60ca1e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -519,6 +519,7 @@ object FunctionRegistry {
expression[CurrentDate]("current_date"),
expression[CurrentTimestamp]("current_timestamp"),
expression[CurrentTimeZone]("current_timezone"),
+ expression[LocalTimestamp]("localtimestamp"),
expression[DateDiff]("datediff"),
expression[DateAdd]("date_add"),
expression[DateFormatClass]("date_format"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 8629300..13c7f75 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestamp, GroupingSets, MonotonicallyIncreasingID, Now}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestampLike, GroupingSets, LocalTimestamp, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -417,7 +417,7 @@ object UnsupportedOperationChecker extends Logging {
subPlan.expressions.foreach { e =>
if (e.collectLeaves().exists {
- case (_: CurrentTimestamp | _: Now | _: CurrentDate) => true
+ case (_: CurrentTimestampLike | _: CurrentDate | _: LocalTimestamp) => true
case _ => false
}) {
throwError(s"Continuous processing does not support current time operations.")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index ca8dea8..1146ba7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions
import java.text.ParseException
-import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId}
+import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId, ZoneOffset}
import java.time.format.DateTimeParseException
import java.util.Locale
@@ -201,6 +201,44 @@ case class Now() extends CurrentTimestampLike {
}
/**
+ * Returns the current timestamp without time zone at the start of query evaluation.
+ * There is no code generation since this expression should get constant folded by the optimizer.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_() - Returns the current timestamp without time zone at the start of query evaluation. All calls of localtimestamp within the same query return the same value.
+
+ _FUNC_ - Returns the current local date-time at the session time zone at the start of query evaluation.
+ """,
+ examples = """
+ Examples:
+ > SELECT _FUNC_();
+ 2020-04-25 15:49:11.914
+ """,
+ group = "datetime_funcs",
+ since = "3.2.0")
+case class LocalTimestamp(timeZoneId: Option[String] = None) extends LeafExpression
+ with TimeZoneAwareExpression with CodegenFallback {
+
+ def this() = this(None)
+
+ override def foldable: Boolean = true
+ override def nullable: Boolean = false
+
+ override def dataType: DataType = TimestampNTZType
+
+ final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE)
+
+ override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+ copy(timeZoneId = Option(timeZoneId))
+
+ override def eval(input: InternalRow): Any = localDateTimeToMicros(LocalDateTime.now(zoneId))
+
+ override def prettyName: String = "localtimestamp"
+}
+
+/**
* Expression representing the current batch time, which is used by StreamExecution to
* 1. prevent optimizer from pushing this expression below a stateful operator
* 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp)
@@ -236,6 +274,8 @@ case class CurrentBatchTimestamp(
val timestampUs = millisToMicros(timestampMs)
dataType match {
case _: TimestampType => Literal(timestampUs, TimestampType)
+ case _: TimestampNTZType =>
+ Literal(convertTz(timestampUs, ZoneOffset.UTC, zoneId), TimestampNTZType)
case _: DateType => Literal(microsToDays(timestampUs, zoneId), DateType)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 116ccde..deacc3b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -81,16 +80,19 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
val currentTime = Literal.create(timestamp, timeExpr.dataType)
val timezone = Literal.create(conf.sessionLocalTimeZone, StringType)
+ val localTimestamps = mutable.Map.empty[String, Literal]
plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
case currentDate @ CurrentDate(Some(timeZoneId)) =>
currentDates.getOrElseUpdate(timeZoneId, {
- Literal.create(
- DateTimeUtils.microsToDays(timestamp, currentDate.zoneId),
- DateType)
+ Literal.create(currentDate.eval().asInstanceOf[Int], DateType)
})
case CurrentTimestamp() | Now() => currentTime
case CurrentTimeZone() => timezone
+ case localTimestamp @ LocalTimestamp(Some(timeZoneId)) =>
+ localTimestamps.getOrElseUpdate(timeZoneId, {
+ Literal.create(localTimestamp.eval().asInstanceOf[Long], TimestampNTZType)
+ })
}
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index f288d82..93fc775 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -96,6 +96,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
assert(math.abs(t1 - ct.getTime) < 5000)
}
+ test("datetime function localtimestamp") {
+ outstandingTimezonesIds.foreach { zid =>
+ val ct = LocalTimestamp(Some(zid)).eval(EmptyRow).asInstanceOf[Long]
+ val t1 = DateTimeUtils.localDateTimeToMicros(
+ LocalDateTime.now(DateTimeUtils.getZoneId(zid)))
+ assert(math.abs(t1 - ct) < 5000)
+ }
+ }
+
test("DayOfYear") {
val sdfDay = new SimpleDateFormat("D", Locale.US)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
index 82d6757..9b04dcd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
@@ -17,10 +17,10 @@
package org.apache.spark.sql.catalyst.optimizer
-import java.time.ZoneId
+import java.time.{LocalDateTime, ZoneId}
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -81,4 +81,25 @@ class ComputeCurrentTimeSuite extends PlanTest {
assert(lits.size == 1)
assert(lits.head == SQLConf.get.sessionLocalTimeZone)
}
+
+ test("analyzer should replace localtimestamp with literals") {
+ val in = Project(Seq(Alias(LocalTimestamp(), "a")(), Alias(LocalTimestamp(), "b")()),
+ LocalRelation())
+
+ val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
+
+ val min = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId))
+ val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
+ val max = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId))
+
+ val lits = new scala.collection.mutable.ArrayBuffer[Long]
+ plan.transformAllExpressions { case e: Literal =>
+ lits += e.value.asInstanceOf[Long]
+ e
+ }
+ assert(lits.size == 2)
+ assert(lits(0) >= min && lits(0) <= max)
+ assert(lits(1) >= min && lits(1) <= max)
+ assert(lits(0) == lits(1))
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 1ffcaf5..c31307f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
@@ -558,6 +558,9 @@ class MicroBatchExecution(
// dummy string to prevent UnresolvedException and to prevent to be used in the future.
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType, Some("Dummy TimeZoneId"))
+ case lt: LocalTimestamp =>
+ CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
+ lt.dataType, lt.timeZoneId)
case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType, cd.timeZoneId)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 7c52b80..5e40860 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestampLike, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
@@ -172,9 +172,9 @@ class ContinuousExecution(
}
withNewSources.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
- case (_: CurrentTimestamp | _: CurrentDate) =>
- throw new IllegalStateException(
- "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
+ case (_: CurrentTimestampLike | _: CurrentDate | _: LocalTimestamp) =>
+ throw new IllegalStateException("CurrentTimestamp, Now, CurrentDate and LocalTimestamp" +
+ " not yet supported for continuous processing")
}
reportTimeTaken("queryPlanning") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 77e4a67..3b39d97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2975,6 +2975,16 @@ object functions {
def current_timestamp(): Column = withExpr { CurrentTimestamp() }
/**
+ * Returns the current timestamp without time zone at the start of query evaluation
+ * as a timestamp without time zone column.
+ * All calls of localtimestamp within the same query return the same value.
+ *
+ * @group datetime_funcs
+ * @since 3.2.0
+ */
+ def localtimestamp(): Column = withExpr { LocalTimestamp() }
+
+ /**
* Converts a date/timestamp/string to a value of string in the format specified by the date
* format given by the second argument.
*
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index f71f3a8..c13a1d4 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -1,6 +1,6 @@
<!-- Automatically generated by ExpressionsSchemaSuite -->
## Summary
- - Number of queries: 359
+ - Number of queries: 360
- Number of expressions that missing example: 13
- Expressions missing examples: bigint,binary,boolean,date,decimal,double,float,int,smallint,string,timestamp,tinyint,window
## Schema of Built-in Functions
@@ -162,6 +162,7 @@
| org.apache.spark.sql.catalyst.expressions.LessThanOrEqual | <= | SELECT 2 <= 2 | struct<(2 <= 2):boolean> |
| org.apache.spark.sql.catalyst.expressions.Levenshtein | levenshtein | SELECT levenshtein('kitten', 'sitting') | struct<levenshtein(kitten, sitting):int> |
| org.apache.spark.sql.catalyst.expressions.Like | like | SELECT like('Spark', '_park') | struct<Spark LIKE _park:boolean> |
+| org.apache.spark.sql.catalyst.expressions.LocalTimestamp | localtimestamp | SELECT localtimestamp() | struct<localtimestamp():timestamp_ntz> |
| org.apache.spark.sql.catalyst.expressions.Log | ln | SELECT ln(1) | struct<ln(1):double> |
| org.apache.spark.sql.catalyst.expressions.Log10 | log10 | SELECT log10(10) | struct<LOG10(10):double> |
| org.apache.spark.sql.catalyst.expressions.Log1p | log1p | SELECT log1p(0) | struct<LOG1P(0):double> |
diff --git a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql
index db30c22..74a451e 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql
@@ -24,6 +24,7 @@ select DATE_FROM_UNIX_DATE(0), DATE_FROM_UNIX_DATE(1000), DATE_FROM_UNIX_DATE(nu
select UNIX_DATE(DATE('1970-01-01')), UNIX_DATE(DATE('2020-12-04')), UNIX_DATE(null);
-- [SPARK-16836] current_date and current_timestamp literals
select current_date = current_date(), current_timestamp = current_timestamp();
+select localtimestamp() = localtimestamp();
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd');
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out
index beff665..4e999f3 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 205
+-- Number of queries: 206
-- !query
@@ -142,6 +142,14 @@ select current_date = current_date(), current_timestamp = current_timestamp()
-- !query
+select localtimestamp() = localtimestamp()
+-- !query schema
+struct<(localtimestamp() = localtimestamp()):boolean>
+-- !query output
+true
+
+
+-- !query
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
-- !query schema
struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date>
diff --git a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out
index b6fe551..2108808 100644
--- a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 205
+-- Number of queries: 206
-- !query
@@ -136,6 +136,14 @@ true true
-- !query
+select localtimestamp() = localtimestamp()
+-- !query schema
+struct<(localtimestamp() = localtimestamp()):boolean>
+-- !query output
+true
+
+
+-- !query
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
-- !query schema
struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date>
diff --git a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out
index c865f2c..f6278f6 100755
--- a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 205
+-- Number of queries: 206
-- !query
@@ -136,6 +136,14 @@ true true
-- !query
+select localtimestamp() = localtimestamp()
+-- !query schema
+struct<(localtimestamp() = localtimestamp()):boolean>
+-- !query output
+true
+
+
+-- !query
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
-- !query schema
struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date>
diff --git a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out
index a5659e1..69c4b8f 100644
--- a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/datetime.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 205
+-- Number of queries: 206
-- !query
@@ -136,6 +136,14 @@ true true
-- !query
+select localtimestamp() = localtimestamp()
+-- !query schema
+struct<(localtimestamp() = localtimestamp()):boolean>
+-- !query output
+true
+
+
+-- !query
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
-- !query schema
struct<to_date(NULL):date,to_date(2016-12-31):date,to_date(2016-12-31, yyyy-MM-dd):date>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index 589ac51..30ee97a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -175,6 +175,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
"org.apache.spark.sql.catalyst.expressions.CurrentTimestamp",
"org.apache.spark.sql.catalyst.expressions.CurrentTimeZone",
"org.apache.spark.sql.catalyst.expressions.Now",
+ "org.apache.spark.sql.catalyst.expressions.LocalTimestamp",
// Random output without a seed
"org.apache.spark.sql.catalyst.expressions.Rand",
"org.apache.spark.sql.catalyst.expressions.Randn",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index eef13ca..77334ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode._
import org.apache.spark.sql.streaming.util.{MockSourceProvider, StreamManualClock}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
import org.apache.spark.util.Utils
@@ -406,56 +406,66 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
)
}
- testWithAllStateVersions("prune results by current_time, complete mode") {
+ testWithAllStateVersions("prune results by current_time or localtimestamp, complete mode") {
import testImplicits._
- val clock = new StreamManualClock
- val inputData = MemoryStream[Long]
- val aggregated =
- inputData.toDF()
+ val inputDataOne = MemoryStream[Long]
+ val aggregatedOne =
+ inputDataOne.toDF()
.groupBy($"value")
.agg(count("*"))
.where('value >= current_timestamp().cast("long") - 10L)
+ val inputDataTwo = MemoryStream[Long]
+ val aggregatedTwo =
+ inputDataTwo.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .where('value >= localtimestamp().cast(TimestampType).cast("long") - 10L)
+
+ Seq((inputDataOne, aggregatedOne), (inputDataTwo, aggregatedTwo)).foreach { x =>
+ val inputData = x._1
+ val aggregated = x._2
+ val clock = new StreamManualClock
+ testStream(aggregated, Complete)(
+ StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
+
+ // advance clock to 10 seconds, all keys retained
+ AddData(inputData, 0L, 5L, 5L, 10L),
+ AdvanceManualClock(10 * 1000),
+ CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+
+ // advance clock to 20 seconds, should retain keys >= 10
+ AddData(inputData, 15L, 15L, 20L),
+ AdvanceManualClock(10 * 1000),
+ CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+
+ // advance clock to 30 seconds, should retain keys >= 20
+ AddData(inputData, 0L, 85L),
+ AdvanceManualClock(10 * 1000),
+ CheckLastBatch((20L, 1), (85L, 1)),
+
+ // bounce stream and ensure correct batch timestamp is used
+ // i.e., we don't take it from the clock, which is at 90 seconds.
+ StopStream,
+ AssertOnQuery { q => // clear the sink
+ q.sink.asInstanceOf[MemorySink].clear()
+ q.commitLog.purge(3)
+ // advance by a minute i.e., 90 seconds total
+ clock.advance(60 * 1000L)
+ true
+ },
+ StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
+ // The commit log blown, causing the last batch to re-run
+ CheckLastBatch((20L, 1), (85L, 1)),
+ AssertOnQuery { q =>
+ clock.getTimeMillis() == 90000L
+ },
- testStream(aggregated, Complete)(
- StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
-
- // advance clock to 10 seconds, all keys retained
- AddData(inputData, 0L, 5L, 5L, 10L),
- AdvanceManualClock(10 * 1000),
- CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
-
- // advance clock to 20 seconds, should retain keys >= 10
- AddData(inputData, 15L, 15L, 20L),
- AdvanceManualClock(10 * 1000),
- CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
-
- // advance clock to 30 seconds, should retain keys >= 20
- AddData(inputData, 0L, 85L),
- AdvanceManualClock(10 * 1000),
- CheckLastBatch((20L, 1), (85L, 1)),
-
- // bounce stream and ensure correct batch timestamp is used
- // i.e., we don't take it from the clock, which is at 90 seconds.
- StopStream,
- AssertOnQuery { q => // clear the sink
- q.sink.asInstanceOf[MemorySink].clear()
- q.commitLog.purge(3)
- // advance by a minute i.e., 90 seconds total
- clock.advance(60 * 1000L)
- true
- },
- StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
- // The commit log blown, causing the last batch to re-run
- CheckLastBatch((20L, 1), (85L, 1)),
- AssertOnQuery { q =>
- clock.getTimeMillis() == 90000L
- },
-
- // advance clock to 100 seconds, should retain keys >= 90
- AddData(inputData, 85L, 90L, 100L, 105L),
- AdvanceManualClock(10 * 1000),
- CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
- )
+ // advance clock to 100 seconds, should retain keys >= 90
+ AddData(inputData, 85L, 90L, 100L, 105L),
+ AdvanceManualClock(10 * 1000),
+ CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
+ )
+ }
}
testWithAllStateVersions("prune results by current_date, complete mode") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org