You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/23 06:45:45 UTC

[spark] branch branch-3.0 updated: [SPARK-31211][SQL] Fix rebasing of 29 February of Julian leap years

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5229d1c  [SPARK-31211][SQL] Fix rebasing of 29 February of Julian leap years
5229d1c is described below

commit 5229d1c66c8817d8fc37b9bc992bf1e411e302b4
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Mon Mar 23 14:21:24 2020 +0800

    [SPARK-31211][SQL] Fix rebasing of 29 February of Julian leap years
    
    In the PR, I propose to fix the issue of rebasing leap years in Julian calendar to Proleptic Gregorian calendar in which the years are not leap years. In the Julian calendar, every four years is a leap year, with a leap day added to the month of February. In Proleptic Gregorian calendar, every year that is exactly divisible by four is a leap year, except for years that are exactly divisible by 100, but these centurial years are leap years, if they are exactly divisible by 400. In this [...]
    
    I modified the `rebaseJulianToGregorianMicros()` and `rebaseJulianToGregorianDays()` in `DateTimeUtils` by passing 1 as a day number of month while forming `LocalDate` or `LocalDateTime`, and adding the number of days using the `plusDays()` method. For example, **1000-02-29** doesn't exist in Proleptic Gregorian calendar, and `LocalDate.of(1000, 2, 29)` throws an exception. To avoid the issue, I build the `LocalDate.of(1000, 2, 1)` date and add 28 days. The `plusDays(28)` method produ [...]
    
    Before the changes, the `java.time.DateTimeException` exception is raised while loading the date `1000-02-29` from parquet files saved by Spark 2.4.5:
    ```scala
    scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true)
    scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_date_leap").show
    20/03/21 03:03:59 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
    java.time.DateTimeException: Invalid date 'February 29' as '1000' is not a leap year
    ```
    The parquet files were saved via the commands:
    ```shell
    $ export TZ="America/Los_Angeles"
    ```
    ```scala
    scala> scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
    scala> val df = Seq(java.sql.Date.valueOf("1000-02-29")).toDF("dateS").select($"dateS".as("date"))
    df: org.apache.spark.sql.DataFrame = [date: date]
    scala> df.write.mode("overwrite").parquet("/Users/maxim/tmp/before_1582/2_4_5_date_leap")
    scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_date_leap").show
    +----------+
    |      date|
    +----------+
    |1000-02-29|
    +----------+
    ```
    
    Yes, after the fix:
    ```scala
    scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
    scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true)
    scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_date_leap").show
    +----------+
    |      date|
    +----------+
    |1000-03-01|
    +----------+
    ```
    
    Added tests to `DateTimeUtilsSuite`.
    
    Closes #27974 from MaxGekk/julian-date-29-feb.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit db6247faa8780bca8f8d3ba71b568ea63b162973)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 14 +++-
 .../sql/catalyst/util/DateTimeUtilsSuite.scala     | 82 +++++++++++++++++-----
 2 files changed, 77 insertions(+), 19 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 7f5babf..ba1c509 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -983,11 +983,16 @@ object DateTimeUtils {
     val localDateTime = LocalDateTime.of(
       cal.get(Calendar.YEAR),
       cal.get(Calendar.MONTH) + 1,
-      cal.get(Calendar.DAY_OF_MONTH),
+      // The number of days will be added later to handle non-existing
+      // Julian dates in Proleptic Gregorian calendar.
+      // For example, 1000-02-29 exists in Julian calendar because 1000
+      // is a leap year but it is not a leap year in Gregorian calendar.
+      1,
       cal.get(Calendar.HOUR_OF_DAY),
       cal.get(Calendar.MINUTE),
       cal.get(Calendar.SECOND),
       (Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
+      .plusDays(cal.get(Calendar.DAY_OF_MONTH) - 1)
     instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant)
   }
 
@@ -1011,7 +1016,12 @@ object DateTimeUtils {
     val localDate = LocalDate.of(
       utcCal.get(Calendar.YEAR),
       utcCal.get(Calendar.MONTH) + 1,
-      utcCal.get(Calendar.DAY_OF_MONTH))
+      // The number of days will be added later to handle non-existing
+      // Julian dates in Proleptic Gregorian calendar.
+      // For example, 1000-02-29 exists in Julian calendar because 1000
+      // is a leap year but it is not a leap year in Gregorian calendar.
+      1)
+      .plusDays(utcCal.get(Calendar.DAY_OF_MONTH) - 1)
     Math.toIntExact(localDate.toEpochDay)
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index f2ad9e6..96da4be 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -670,6 +670,17 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
     }
   }
 
+  private def parseToJulianMicros(s: String): Long = {
+    val ts = Timestamp.valueOf(s)
+    val julianMicros = fromMillis(ts.getTime) +
+      ((ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
+    julianMicros
+  }
+
+  private def parseToGregMicros(s: String, zoneId: ZoneId): Long = {
+    instantToMicros(LocalDateTime.parse(s).atZone(zoneId).toInstant)
+  }
+
   test("rebase julian to/from gregorian micros") {
     outstandingTimezones.foreach { timeZone =>
       withDefaultTimeZone(timeZone) {
@@ -684,30 +695,27 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
           "1970-01-01 00:00:00.000001", // The epoch day
           "2020-03-14 09:33:01.500000").foreach { ts =>
           withClue(s"time zone = ${timeZone.getID} ts = $ts") {
-            val julianTs = Timestamp.valueOf(ts)
-            val julianMicros = fromMillis(julianTs.getTime) +
-              ((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
-            val gregorianMicros = instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T'))
-              .atZone(timeZone.toZoneId)
-              .toInstant)
+            val julianMicros = parseToJulianMicros(ts)
+            val gregMicros = parseToGregMicros(ts.replace(' ', 'T'), timeZone.toZoneId)
 
-            assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros)
-            assert(rebaseGregorianToJulianMicros(gregorianMicros) === julianMicros)
+            assert(rebaseJulianToGregorianMicros(julianMicros) === gregMicros)
+            assert(rebaseGregorianToJulianMicros(gregMicros) === julianMicros)
           }
         }
       }
     }
   }
 
+  // millisToDays() and fromJavaDate() are taken from Spark 2.4
+  private def millisToDaysLegacy(millisUtc: Long, timeZone: TimeZone): Int = {
+    val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
+    Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
+  }
+  private def fromJavaDateLegacy(date: Date): Int = {
+    millisToDaysLegacy(date.getTime, defaultTimeZone())
+  }
+
   test("rebase gregorian to/from julian days") {
-    // millisToDays() and fromJavaDate() are taken from Spark 2.4
-    def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = {
-      val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
-      Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
-    }
-    def fromJavaDate(date: Date): Int = {
-      millisToDays(date.getTime, defaultTimeZone())
-    }
     outstandingTimezones.foreach { timeZone =>
       withDefaultTimeZone(timeZone) {
         Seq(
@@ -720,7 +728,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
           "1969-12-31",
           "1970-01-01", // The epoch day
           "2020-03-14").foreach { date =>
-          val julianDays = fromJavaDate(Date.valueOf(date))
+          val julianDays = fromJavaDateLegacy(Date.valueOf(date))
           val gregorianDays = localDateToDays(LocalDate.parse(date))
 
           assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays)
@@ -729,4 +737,44 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
       }
     }
   }
+
+  test("rebase julian to gregorian date for leap years") {
+    outstandingTimezones.foreach { timeZone =>
+      withDefaultTimeZone(timeZone) {
+        Seq(
+          "1000-02-29" -> "1000-03-01",
+          "1600-02-29" -> "1600-02-29",
+          "1700-02-29" -> "1700-03-01",
+          "2000-02-29" -> "2000-02-29").foreach { case (julianDate, gregDate) =>
+          withClue(s"tz = ${timeZone.getID} julian date = $julianDate greg date = $gregDate") {
+            val date = Date.valueOf(julianDate)
+            val julianDays = fromJavaDateLegacy(date)
+            val gregorianDays = localDateToDays(LocalDate.parse(gregDate))
+
+            assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays)
+          }
+        }
+      }
+    }
+  }
+
+  test("rebase julian to gregorian timestamp for leap years") {
+    outstandingTimezones.foreach { timeZone =>
+      withDefaultTimeZone(timeZone) {
+        Seq(
+          "1000-02-29 01:02:03.123456" -> "1000-03-01T01:02:03.123456",
+          "1600-02-29 11:12:13.654321" -> "1600-02-29T11:12:13.654321",
+          "1700-02-29 21:22:23.000001" -> "1700-03-01T21:22:23.000001",
+          "2000-02-29 00:00:00.999999" -> "2000-02-29T00:00:00.999999"
+        ).foreach { case (julianTs, gregTs) =>
+          withClue(s"tz = ${timeZone.getID} julian ts = $julianTs greg ts = $gregTs") {
+            val julianMicros = parseToJulianMicros(julianTs)
+            val gregorianMicros = parseToGregMicros(gregTs, timeZone.toZoneId)
+
+            assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros)
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org