You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/14 14:29:12 UTC

[spark] branch branch-3.0 updated: [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0a8d7a3  [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels
0a8d7a3 is described below

commit 0a8d7a35e24acbd7af57fe5169691afb8ccd8675
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Fri Feb 14 22:16:57 2020 +0800

    [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to use Java 8 time API in timestamp truncations to the levels of `HOUR` and `DAY`. The problem is in the usage of `timeZone.getOffset(millis)` in days/hours truncations where the combined calendar (Julian + Gregorian) is used underneath.
    
    ### Why are the changes needed?
    The change fix wrong truncations. For example, the following truncation to hours should print `0010-01-01 01:00:00` but it outputs wrong timestamp:
    ```scala
    Seq("0010-01-01 01:02:03.123456").toDF()
        .select($"value".cast("timestamp").as("ts"))
        .select(date_trunc("HOUR", $"ts").cast("string"))
        .show(false)
    +------------------------------------+
    |CAST(date_trunc(HOUR, ts) AS STRING)|
    +------------------------------------+
    |0010-01-01 01:30:17                 |
    +------------------------------------+
    ```
    
    ### Does this PR introduce any user-facing change?
    Yes. After the changes, the result of the example above is:
    ```scala
    +------------------------------------+
    |CAST(date_trunc(HOUR, ts) AS STRING)|
    +------------------------------------+
    |0010-01-01 01:00:00                 |
    +------------------------------------+
    ```
    
    ### How was this patch tested?
    - Added new test to `DateFunctionsSuite`
    - By `DateExpressionsSuite` and `DateTimeUtilsSuite`
    
    Closes #27512 from MaxGekk/fix-trunc-old-timestamp.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 7137a6d065edeaab97bf5bf49ffaca3d060a14fe)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/expressions/datetimeExpressions.scala |  6 +--
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 44 +++++++++++-----------
 .../sql/catalyst/util/DateTimeUtilsSuite.scala     | 39 +++++++++----------
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 13 +++++++
 4 files changed, 59 insertions(+), 43 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index cf91489..adf7251 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -1690,15 +1690,15 @@ case class TruncTimestamp(
 
   override def eval(input: InternalRow): Any = {
     evalHelper(input, minLevel = MIN_LEVEL_OF_TIMESTAMP_TRUNC) { (t: Any, level: Int) =>
-      DateTimeUtils.truncTimestamp(t.asInstanceOf[Long], level, timeZone)
+      DateTimeUtils.truncTimestamp(t.asInstanceOf[Long], level, zoneId)
     }
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    val tz = ctx.addReferenceObj("timeZone", timeZone)
+    val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
     codeGenHelper(ctx, ev, minLevel = MIN_LEVEL_OF_TIMESTAMP_TRUNC, true) {
       (date: String, fmt: String) =>
-        s"truncTimestamp($date, $fmt, $tz);"
+        s"truncTimestamp($date, $fmt, $zid);"
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 01d36f1..ce0c138 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -711,32 +711,34 @@ object DateTimeUtils {
     }
   }
 
+  private def truncToUnit(t: SQLTimestamp, zoneId: ZoneId, unit: ChronoUnit): SQLTimestamp = {
+    val truncated = microsToInstant(t).atZone(zoneId).truncatedTo(unit)
+    instantToMicros(truncated.toInstant)
+  }
+
   /**
    * Returns the trunc date time from original date time and trunc level.
    * Trunc level should be generated using `parseTruncLevel()`, should be between 0 and 12.
    */
-  def truncTimestamp(t: SQLTimestamp, level: Int, timeZone: TimeZone): SQLTimestamp = {
-    if (level == TRUNC_TO_MICROSECOND) return t
-    var millis = MICROSECONDS.toMillis(t)
-    val truncated = level match {
-      case TRUNC_TO_MILLISECOND => millis
-      case TRUNC_TO_SECOND =>
-        millis - millis % MILLIS_PER_SECOND
-      case TRUNC_TO_MINUTE =>
-        millis - millis % MILLIS_PER_MINUTE
-      case TRUNC_TO_HOUR =>
-        val offset = timeZone.getOffset(millis)
-        millis += offset
-        millis - millis % MILLIS_PER_HOUR - offset
-      case TRUNC_TO_DAY =>
-        val offset = timeZone.getOffset(millis)
-        millis += offset
-        millis - millis % MILLIS_PER_DAY - offset
-      case _ => // Try to truncate date levels
-        val dDays = millisToDays(millis, timeZone.toZoneId)
-        daysToMillis(truncDate(dDays, level), timeZone.toZoneId)
+  def truncTimestamp(t: SQLTimestamp, level: Int, zoneId: ZoneId): SQLTimestamp = {
+    level match {
+      case TRUNC_TO_MICROSECOND => t
+      case TRUNC_TO_HOUR => truncToUnit(t, zoneId, ChronoUnit.HOURS)
+      case TRUNC_TO_DAY => truncToUnit(t, zoneId, ChronoUnit.DAYS)
+      case _ =>
+        val millis = MICROSECONDS.toMillis(t)
+        val truncated = level match {
+          case TRUNC_TO_MILLISECOND => millis
+          case TRUNC_TO_SECOND =>
+            millis - millis % MILLIS_PER_SECOND
+          case TRUNC_TO_MINUTE =>
+            millis - millis % MILLIS_PER_MINUTE
+          case _ => // Try to truncate date levels
+            val dDays = millisToDays(millis, zoneId)
+            daysToMillis(truncDate(dDays, level), zoneId)
+        }
+        truncated * MICROS_PER_MILLIS
     }
-    truncated * MICROS_PER_MILLIS
   }
 
   /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index cd0594c..ff4d8a2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -499,9 +499,9 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
         level: Int,
         expected: String,
         inputTS: SQLTimestamp,
-        timezone: TimeZone = DateTimeUtils.defaultTimeZone()): Unit = {
+        zoneId: ZoneId = defaultZoneId): Unit = {
       val truncated =
-        DateTimeUtils.truncTimestamp(inputTS, level, timezone)
+        DateTimeUtils.truncTimestamp(inputTS, level, zoneId)
       val expectedTS = toTimestamp(expected, defaultZoneId)
       assert(truncated === expectedTS.get)
     }
@@ -539,6 +539,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
 
     for (tz <- ALL_TIMEZONES) {
       withDefaultTimeZone(tz) {
+        val zid = tz.toZoneId
         val inputTS = DateTimeUtils.stringToTimestamp(
           UTF8String.fromString("2015-03-05T09:32:05.359"), defaultZoneId)
         val inputTS1 = DateTimeUtils.stringToTimestamp(
@@ -552,23 +553,23 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
         val inputTS5 = DateTimeUtils.stringToTimestamp(
           UTF8String.fromString("1999-03-29T01:02:03.456789"), defaultZoneId)
 
-        testTrunc(DateTimeUtils.TRUNC_TO_YEAR, "2015-01-01T00:00:00", inputTS.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_MONTH, "2015-03-01T00:00:00", inputTS.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_DAY, "2015-03-05T00:00:00", inputTS.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_HOUR, "2015-03-05T09:00:00", inputTS.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_MINUTE, "2015-03-05T09:32:00", inputTS.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_SECOND, "2015-03-05T09:32:05", inputTS.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-02T00:00:00", inputTS.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS1.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS2.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS3.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-23T00:00:00", inputTS4.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-01-01T00:00:00", inputTS.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-01-01T00:00:00", inputTS1.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-04-01T00:00:00", inputTS2.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_DECADE, "1990-01-01", inputTS5.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_CENTURY, "1901-01-01", inputTS5.get, tz)
-        testTrunc(DateTimeUtils.TRUNC_TO_MILLENNIUM, "2001-01-01", inputTS.get, tz)
+        testTrunc(DateTimeUtils.TRUNC_TO_YEAR, "2015-01-01T00:00:00", inputTS.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_MONTH, "2015-03-01T00:00:00", inputTS.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_DAY, "2015-03-05T00:00:00", inputTS.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_HOUR, "2015-03-05T09:00:00", inputTS.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_MINUTE, "2015-03-05T09:32:00", inputTS.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_SECOND, "2015-03-05T09:32:05", inputTS.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-02T00:00:00", inputTS.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS1.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS2.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-30T00:00:00", inputTS3.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_WEEK, "2015-03-23T00:00:00", inputTS4.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-01-01T00:00:00", inputTS.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-01-01T00:00:00", inputTS1.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_QUARTER, "2015-04-01T00:00:00", inputTS2.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_DECADE, "1990-01-01", inputTS5.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_CENTURY, "1901-01-01", inputTS5.get, zid)
+        testTrunc(DateTimeUtils.TRUNC_TO_MILLENNIUM, "2001-01-01", inputTS.get, zid)
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 41d53c9..ba45b9f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -856,4 +856,17 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
       TimeZone.setDefault(defaultTz)
     }
   }
+
+  test("SPARK-30766: date_trunc of old timestamps to hours and days") {
+    def checkTrunc(level: String, expected: String): Unit = {
+      val df = Seq("0010-01-01 01:02:03.123456")
+        .toDF()
+        .select($"value".cast("timestamp").as("ts"))
+        .select(date_trunc(level, $"ts").cast("string"))
+      checkAnswer(df, Row(expected))
+    }
+
+    checkTrunc("HOUR", "0010-01-01 01:00:00")
+    checkTrunc("DAY", "0010-01-01 00:00:00")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org