You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/14 14:29:12 UTC
[spark] branch branch-3.0 updated: [SPARK-30766][SQL] Fix the
timestamp truncation to the `HOUR` and `DAY` levels
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0a8d7a3 [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels
0a8d7a3 is described below
commit 0a8d7a35e24acbd7af57fe5169691afb8ccd8675
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Fri Feb 14 22:16:57 2020 +0800
[SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels
### What changes were proposed in this pull request?
In the PR, I propose to use Java 8 time API in timestamp truncations to the levels of `HOUR` and `DAY`. The problem is in the usage of `timeZone.getOffset(millis)` in days/hours truncations where the combined calendar (Julian + Gregorian) is used underneath.
### Why are the changes needed?
The change fix wrong truncations. For example, the following truncation to hours should print `0010-01-01 01:00:00` but it outputs wrong timestamp:
```scala
Seq("0010-01-01 01:02:03.123456").toDF()
.select($"value".cast("timestamp").as("ts"))
.select(date_trunc("HOUR", $"ts").cast("string"))
.show(false)
+------------------------------------+
|CAST(date_trunc(HOUR, ts) AS STRING)|
+------------------------------------+
|0010-01-01 01:30:17 |
+------------------------------------+
```
### Does this PR introduce any user-facing change?
Yes. After the changes, the result of the example above is:
```scala
+------------------------------------+
|CAST(date_trunc(HOUR, ts) AS STRING)|
+------------------------------------+
|0010-01-01 01:00:00 |
+------------------------------------+
```
### How was this patch tested?
- Added new test to `DateFunctionsSuite`
- By `DateExpressionsSuite` and `DateTimeUtilsSuite`
Closes #27512 from MaxGekk/fix-trunc-old-timestamp.
Authored-by: Maxim Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 7137a6d065edeaab97bf5bf49ffaca3d060a14fe)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../catalyst/expressions/datetimeExpressions.scala | 6 +--
.../spark/sql/catalyst/util/DateTimeUtils.scala | 44 +++++++++++-----------
.../sql/catalyst/util/DateTimeUtilsSuite.scala | 39 +++++++++----------
.../org/apache/spark/sql/DateFunctionsSuite.scala | 13 +++++++
4 files changed, 59 insertions(+), 43 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index cf91489..adf7251 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -1690,15 +1690,15 @@ case class TruncTimestamp(
override def eval(input: InternalRow): Any = {
evalHelper(input, minLevel = MIN_LEVEL_OF_TIMESTAMP_TRUNC) { (t: Any, level: Int) =>
- DateTimeUtils.truncTimestamp(t.asInstanceOf[Long], level, timeZone)
+ DateTimeUtils.truncTimestamp(t.asInstanceOf[Long], level, zoneId)
}
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
- val tz = ctx.addReferenceObj("timeZone", timeZone)
+ val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
codeGenHelper(ctx, ev, minLevel = MIN_LEVEL_OF_TIMESTAMP_TRUNC, true) {
(date: String, fmt: String) =>
- s"truncTimestamp($date, $fmt, $tz);"
+ s"truncTimestamp($date, $fmt, $zid);"
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 01d36f1..ce0c138 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -711,32 +711,34 @@ object DateTimeUtils {
}
}
+ private def truncToUnit(t: SQLTimestamp, zoneId: ZoneId, unit: ChronoUnit): SQLTimestamp = {
+ val truncated = microsToInstant(t).atZone(zoneId).truncatedTo(unit)
+ instantToMicros(truncated.toInstant)
+ }
+
/**
* Returns the trunc date time from original date time and trunc level.
* Trunc level should be generated using `parseTruncLevel()`, should be between 0 and 12.
*/
- def truncTimestamp(t: SQLTimestamp, level: Int, timeZone: TimeZone): SQLTimestamp = {
- if (level == TRUNC_TO_MICROSECOND) return t
- var millis = MICROSECONDS.toMillis(t)
- val truncated = level match {
- case TRUNC_TO_MILLISECOND => millis
- case TRUNC_TO_SECOND =>
- millis - millis % MILLIS_PER_SECOND
- case TRUNC_TO_MINUTE =>
- millis - millis % MILLIS_PER_MINUTE
- case TRUNC_TO_HOUR =>
- val offset = timeZone.getOffset(millis)
- millis += offset
- millis - millis % MILLIS_PER_HOUR - offset
- case TRUNC_TO_DAY =>
- val offset = timeZone.getOffset(millis)
- millis += offset
- millis - millis % MILLIS_PER_DAY - offset
- case _ => // Try to truncate date levels
- val dDays = millisToDays(millis, timeZone.toZoneId)
- daysToMillis(truncDate(dDays, level), timeZone.toZoneId)
+ def truncTimestamp(t: SQLTimestamp, level: Int, zoneId: ZoneId): SQLTimestamp = {
+ level match {
+ case TRUNC_TO_MICROSECOND => t
+ case TRUNC_TO_HOUR => truncToUnit(t, zoneId, ChronoUnit.HOURS)
+ case TRUNC_TO_DAY => truncToUnit(t, zoneId, ChronoUnit.DAYS)
+ case _ =>
+ val millis = MICROSECONDS.toMillis(t)
+ val truncated = level match {
+ case TRUNC_TO_MILLISECOND => millis
+ case TRUNC_TO_SECOND =>
+ millis - millis % MILLIS_PER_SECOND
+ case TRUNC_TO_MINUTE =>
+ millis - millis % MILLIS_PER_MINUTE
+ case _ => // Try to truncate date levels
+ val dDays = millisToDays(millis, zoneId)
+ daysToMillis(truncDate(dDays, level), zoneId)
+ }
+ truncated * MICROS_PER_MILLIS
}
- truncated * MICROS_PER_MILLIS
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index cd0594c..ff4d8a2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -499,9 +499,9 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
level: Int,
expected: String,
inputTS: SQLTimestamp,
- timezone: TimeZone = DateTimeUtils.defaultTimeZone()): Unit = {
+ zoneId: ZoneId = defaultZoneId): Unit = {
val truncated =
- DateTimeUtils.truncTimestamp(inputTS, level, timezone)
+ DateTimeUtils.truncTimestamp(inputTS, level, zoneId)
val expectedTS = toTimestamp(expected, defaultZoneId)
assert(truncated === expectedTS.get)
}
@@ -539,6 +539,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
for (tz <- ALL_TIMEZONES) {
withDefaultTimeZone(tz) {
+ val zid = tz.toZoneId
val inputTS = DateTimeUtils.stringToTimestamp(
UTF8String.fromString("2015-03-05T09:32:05.359"), defaultZoneId)
val inputTS1 = DateTimeUtils.stringToTimestamp(
@@ -552,23 +553,23 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
val inputTS5 = DateTimeUtils.stringToTimestamp(
UTF8String.fromString("1999-03-29T01:02:03.456789"), defaultZoneId)
- testTrunc(DateTimeUtils.TRUNC_TO_YEAR, "2015-01-01T00:00:00", inputTS.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_MONTH, "2015-03-01T00:00:00", inputTS.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_DAY, "2015-03-05T00:00:00", inputTS.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_HOUR, "2015-03-05T09:00:00", inputTS.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_MINUTE, "2015-03-05T09:32:00", inputTS.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_SECOND, "2015-03-05T09:32:05", inputTS.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-02T00:00:00", inputTS.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS1.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS2.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS3.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-23T00:00:00", inputTS4.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-01-01T00:00:00", inputTS.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-01-01T00:00:00", inputTS1.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-04-01T00:00:00", inputTS2.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_DECADE, "1990-01-01", inputTS5.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_CENTURY, "1901-01-01", inputTS5.get, tz)
- testTrunc(DateTimeUtils.TRUNC_TO_MILLENNIUM, "2001-01-01", inputTS.get, tz)
+ testTrunc(DateTimeUtils.TRUNC_TO_YEAR, "2015-01-01T00:00:00", inputTS.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_MONTH, "2015-03-01T00:00:00", inputTS.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_DAY, "2015-03-05T00:00:00", inputTS.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_HOUR, "2015-03-05T09:00:00", inputTS.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_MINUTE, "2015-03-05T09:32:00", inputTS.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_SECOND, "2015-03-05T09:32:05", inputTS.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-02T00:00:00", inputTS.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS1.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS2.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS3.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-23T00:00:00", inputTS4.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-01-01T00:00:00", inputTS.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-01-01T00:00:00", inputTS1.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-04-01T00:00:00", inputTS2.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_DECADE, "1990-01-01", inputTS5.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_CENTURY, "1901-01-01", inputTS5.get, zid)
+ testTrunc(DateTimeUtils.TRUNC_TO_MILLENNIUM, "2001-01-01", inputTS.get, zid)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 41d53c9..ba45b9f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -856,4 +856,17 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
TimeZone.setDefault(defaultTz)
}
}
+
+ test("SPARK-30766: date_trunc of old timestamps to hours and days") {
+ def checkTrunc(level: String, expected: String): Unit = {
+ val df = Seq("0010-01-01 01:02:03.123456")
+ .toDF()
+ .select($"value".cast("timestamp").as("ts"))
+ .select(date_trunc(level, $"ts").cast("string"))
+ checkAnswer(df, Row(expected))
+ }
+
+ checkTrunc("HOUR", "0010-01-01 01:00:00")
+ checkTrunc("DAY", "0010-01-01 00:00:00")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org