You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/19 06:47:40 UTC

[spark] branch branch-3.0 updated: [SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d712a7a  [SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet
d712a7a is described below

commit d712a7aeef4606a2ab3102b04545cde218eb712a
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Thu Mar 19 12:49:51 2020 +0800

    [SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet
    
    The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing dates and timestamp via Parquet datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for dates/timestamps before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is [...]
    - -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into parquet.
    - -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value.
    
    According to the parquet spec, parquet timestamps of the `TIMESTAMP_MILLIS`, `TIMESTAMP_MICROS` output type and parquet dates should be based on Proleptic Gregorian calendar but the `INT96` timestamps should be stored as Julian days. Since the version 3.0, Spark conforms the spec but for the backward compatibility with previous version, the PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid one under the SQL config:
    ```
    spark.sql.legacy.parquet.rebaseDateTime.enabled
    ```
    which is set to `false` by default which means the rebasing is not performed by default.
    
    The details of the implementation:
    1. Added 2 methods to `DateTimeUtils` for rebasing microseconds. `rebaseGregorianToJulianMicros()` builds a local timestamp in Proleptic Gregorian calendar, extracts date-time fields `year`, `month`, ..., `second fraction` from the local timestamp and uses them to build another local timestamp based on the hybrid calendar (using `java.util.Calendar` API). After that it calculates the number of microseconds since the epoch using the resulted local timestamp. The function performs the c [...]
    2. Added 2 methods to `DateTimeUtils` for rebasing days. `rebaseGregorianToJulianDays()` builds a local date from the passed number of days since the epoch in Proleptic Gregorian calendar, interprets the resulted date as a local date in the hybrid calendar and gets the number of days since the epoch from the resulted local date. The conversion is performed via the `UTC` time zone because the conversion is independent from time zones, and `UTC` is selected to void round issues of casti [...]
    3. Use `rebaseGregorianToJulianMicros()` and `rebaseGregorianToJulianDays()` while saving timestamps/dates to parquet files if the SQL config is on.
    4. Use `rebaseJulianToGregorianMicros()` and `rebaseJulianToGregorianDays()` while loading timestamps/dates from parquet files if the SQL config is on.
    5. The SQL config `spark.sql.legacy.parquet.rebaseDateTime.enabled` controls conversions from/to dates, timestamps of `TIMESTAMP_MILLIS`, `TIMESTAMP_MICROS`, see the SQL config `spark.sql.parquet.outputTimestampType`.
    6. The rebasing is always performed for `INT96` timestamps, independently from `spark.sql.legacy.parquet.rebaseDateTime.enabled`.
    7. Supported the vectorized parquet reader, see the SQL config `spark.sql.parquet.enableVectorizedReader`.
    
    - For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. Also after the changes, users can enable the rebasing in write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and earlier versions.
    - It fixes the bug of incorrect saving/loading timestamps of the `INT96` type
    
    Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as `TIMESTAMP_MICROS` is interpreted by Spark 3.0.0-preview2 differently:
    ```scala
    scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros").show(false)
    +--------------------------+
    |ts                        |
    +--------------------------+
    |1001-01-07 11:32:20.123456|
    +--------------------------+
    ```
    After the changes:
    ```scala
    scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true)
    
    scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros").show(false)
    +--------------------------+
    |ts                        |
    +--------------------------+
    |1001-01-01 01:02:03.123456|
    +--------------------------+
    ```
    
    1. Added tests to `ParquetIOSuite` to check rebasing in read for regular reader and vectorized parquet reader. The test reads back parquet files saved by Spark 2.4.5 via:
    ```shell
    $ export TZ="America/Los_Angeles"
    ```
    ```scala
    scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
    scala> val df = Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date"))
    df: org.apache.spark.sql.DataFrame = [date: date]
    scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_date")
    
    scala> val df = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts"))
    df: org.apache.spark.sql.DataFrame = [ts: timestamp]
    
    scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
    scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros")
    
    scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
    scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_millis")
    
    scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "INT96")
    scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_int96")
    ```
    2. Manually check the write code path. Save date/timestamps (TIMESTAMP_MICROS, TIMESTAMP_MILLIS, INT96) by Spark 3.1.0-SNAPSHOT (after the changes):
    ```bash
    $ export TZ="America/Los_Angeles"
    ```
    ```scala
    scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
    scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true)
    scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
    scala> val df = Seq(("1001-01-01", "1001-01-01 01:02:03.123456")).toDF("dateS", "tsS").select($"dateS".cast("date").as("d"), $"tsS".cast("timestamp").as("ts"))
    df: org.apache.spark.sql.DataFrame = [d: date, ts: timestamp]
    scala> df.write.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros")
    scala> spark.read.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros").show(false)
    +----------+--------------------------+
    |d         |ts                        |
    +----------+--------------------------+
    |1001-01-01|1001-01-01 01:02:03.123456|
    +----------+--------------------------+
    ```
    Read the saved date/timestamp by Spark 2.4.5:
    ```scala
    scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
    scala> spark.read.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros").show(false)
    +----------+--------------------------+
    |d         |ts                        |
    +----------+--------------------------+
    |1001-01-01|1001-01-01 01:02:03.123456|
    +----------+--------------------------+
    ```
    
    Closes #27915 from MaxGekk/rebase-parquet-datetime.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit bb295d80e37440e6ee67a00bc09df2e9ff6e4e46)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 106 ++++++++++++++++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  15 +++
 .../sql/catalyst/util/DateTimeUtilsSuite.scala     |  60 ++++++++++++
 .../parquet/VectorizedColumnReader.java            |  44 ++++++++-
 .../datasources/parquet/ParquetRowConverter.scala  |  49 ++++++++--
 .../datasources/parquet/ParquetWriteSupport.scala  |  19 ++++
 .../test-data/before_1582_date_v2_4.snappy.parquet | Bin 0 -> 398 bytes
 ...before_1582_timestamp_int96_v2_4.snappy.parquet | Bin 0 -> 494 bytes
 ...efore_1582_timestamp_micros_v2_4.snappy.parquet | Bin 0 -> 436 bytes
 ...efore_1582_timestamp_millis_v2_4.snappy.parquet | Bin 0 -> 436 bytes
 .../datasources/parquet/ParquetIOSuite.scala       |  66 +++++++++++++
 11 files changed, 342 insertions(+), 17 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index a8c55e7..7f5babf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 import java.time._
 import java.time.temporal.{ChronoField, ChronoUnit, IsoFields}
-import java.util.{Locale, TimeZone}
+import java.util.{Calendar, Locale, TimeZone}
 import java.util.concurrent.TimeUnit._
 
 import scala.util.control.NonFatal
@@ -150,7 +150,9 @@ object DateTimeUtils {
   def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = {
     // use Long to avoid rounding errors
     val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY
-    SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
+    val micros = SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
+    val rebased = rebaseJulianToGregorianMicros(micros)
+    rebased
   }
 
   /**
@@ -159,7 +161,7 @@ object DateTimeUtils {
    * Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
    */
   def toJulianDay(us: SQLTimestamp): (Int, Long) = {
-    val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
+    val julian_us = rebaseGregorianToJulianMicros(us) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
     val day = julian_us / MICROS_PER_DAY
     val micros = julian_us % MICROS_PER_DAY
     (day.toInt, MICROSECONDS.toNanos(micros))
@@ -942,4 +944,102 @@ object DateTimeUtils {
     val days = period.getDays
     new CalendarInterval(months, days, 0)
   }
+
+  /**
+   * Converts the given microseconds to a local date-time in UTC time zone in Proleptic Gregorian
+   * calendar, interprets the result as a local date-time in Julian calendar in UTC time zone.
+   * And takes microseconds since the epoch from the Julian timestamp.
+   *
+   * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
+   * @return The rebased microseconds since the epoch in Julian calendar.
+   */
+  def rebaseGregorianToJulianMicros(micros: Long): Long = {
+    val ldt = microsToInstant(micros).atZone(ZoneId.systemDefault).toLocalDateTime
+    val cal = new Calendar.Builder()
+      // `gregory` is a hybrid calendar that supports both
+      // the Julian and Gregorian calendar systems
+      .setCalendarType("gregory")
+      .setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
+      .setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
+      .build()
+    fromMillis(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND)
+  }
+
+  /**
+   * Converts the given microseconds to a local date-time in UTC time zone in Julian calendar,
+   * interprets the result as a local date-time in Proleptic Gregorian calendar in UTC time zone.
+   * And takes microseconds since the epoch from the Gregorian timestamp.
+   *
+   * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
+   * @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
+   */
+  def rebaseJulianToGregorianMicros(micros: Long): Long = {
+    val cal = new Calendar.Builder()
+      // `gregory` is a hybrid calendar that supports both
+      // the Julian and Gregorian calendar systems
+      .setCalendarType("gregory")
+      .setInstant(toMillis(micros))
+      .build()
+    val localDateTime = LocalDateTime.of(
+      cal.get(Calendar.YEAR),
+      cal.get(Calendar.MONTH) + 1,
+      cal.get(Calendar.DAY_OF_MONTH),
+      cal.get(Calendar.HOUR_OF_DAY),
+      cal.get(Calendar.MINUTE),
+      cal.get(Calendar.SECOND),
+      (Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
+    instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant)
+  }
+
+  /**
+   * Converts the given number of days since the epoch day 1970-01-01 to
+   * a local date in Julian calendar, interprets the result as a local
+   * date in Proleptic Gregorian calendar, and take the number of days
+   * since the epoch from the Gregorian date.
+   *
+   * @param days The number of days since the epoch in Julian calendar.
+   * @return The rebased number of days in Gregorian calendar.
+   */
+  def rebaseJulianToGregorianDays(days: Int): Int = {
+    val utcCal = new Calendar.Builder()
+      // `gregory` is a hybrid calendar that supports both
+      // the Julian and Gregorian calendar systems
+      .setCalendarType("gregory")
+      .setTimeZone(TimeZoneUTC)
+      .setInstant(Math.multiplyExact(days, MILLIS_PER_DAY))
+      .build()
+    val localDate = LocalDate.of(
+      utcCal.get(Calendar.YEAR),
+      utcCal.get(Calendar.MONTH) + 1,
+      utcCal.get(Calendar.DAY_OF_MONTH))
+    Math.toIntExact(localDate.toEpochDay)
+  }
+
+  /**
+   * Rebasing days since the epoch to store the same number of days
+   * as by Spark 2.4 and earlier versions. Spark 3.0 switched to
+   * Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
+   * this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
+   * Julian calendar for dates before 1582-10-15. So, the same local date may
+   * be mapped to different number of days since the epoch in different calendars.
+   *
+   * For example:
+   *   Proleptic Gregorian calendar: 1582-01-01 -> -141714
+   *   Julian calendar: 1582-01-01 -> -141704
+   * The code below converts -141714 to -141704.
+   *
+   * @param days The number of days since the epoch 1970-01-01. It can be negative.
+   * @return The rebased number of days since the epoch in Julian calendar.
+   */
+  def rebaseGregorianToJulianDays(days: Int): Int = {
+    val localDate = LocalDate.ofEpochDay(days)
+    val utcCal = new Calendar.Builder()
+      // `gregory` is a hybrid calendar that supports both
+      // the Julian and Gregorian calendar systems
+      .setCalendarType("gregory")
+      .setTimeZone(TimeZoneUTC)
+      .setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth)
+      .build()
+    Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, MILLIS_PER_DAY))
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1331350..a47f7d9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2232,6 +2232,19 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val LEGACY_PARQUET_REBASE_DATETIME =
+    buildConf("spark.sql.legacy.parquet.rebaseDateTime.enabled")
+      .internal()
+      .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
+        "to the hybrid calendar (Julian + Gregorian) in write and " +
+        "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
+        "The rebasing is performed by converting micros/millis/days to " +
+        "a local date/timestamp in the source calendar, interpreting the resulted date/" +
+        "timestamp in the target calendar, and getting the number of micros/millis/days " +
+        "since the epoch 1970-01-01 00:00:00Z.")
+      .booleanConf
+      .createWithDefault(false)
+
   /**
    * Holds information about keys that have been deprecated.
    *
@@ -2807,6 +2820,8 @@ class SQLConf extends Serializable with Logging {
 
   def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
 
+  def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 0df395b..f2ad9e6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -669,4 +669,64 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
       assert(toDate("tomorrow CET ", zoneId).get === today + 1)
     }
   }
+
+  test("rebase julian to/from gregorian micros") {
+    outstandingTimezones.foreach { timeZone =>
+      withDefaultTimeZone(timeZone) {
+        Seq(
+          "0001-01-01 01:02:03.654321",
+          "1000-01-01 03:02:01.123456",
+          "1582-10-04 00:00:00.000000",
+          "1582-10-15 00:00:00.999999", // Gregorian cutover day
+          "1883-11-10 00:00:00.000000", // America/Los_Angeles -7:52:58 zone offset
+          "1883-11-20 00:00:00.000000", // America/Los_Angeles -08:00 zone offset
+          "1969-12-31 11:22:33.000100",
+          "1970-01-01 00:00:00.000001", // The epoch day
+          "2020-03-14 09:33:01.500000").foreach { ts =>
+          withClue(s"time zone = ${timeZone.getID} ts = $ts") {
+            val julianTs = Timestamp.valueOf(ts)
+            val julianMicros = fromMillis(julianTs.getTime) +
+              ((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
+            val gregorianMicros = instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T'))
+              .atZone(timeZone.toZoneId)
+              .toInstant)
+
+            assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros)
+            assert(rebaseGregorianToJulianMicros(gregorianMicros) === julianMicros)
+          }
+        }
+      }
+    }
+  }
+
+  test("rebase gregorian to/from julian days") {
+    // millisToDays() and fromJavaDate() are taken from Spark 2.4
+    def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = {
+      val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
+      Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
+    }
+    def fromJavaDate(date: Date): Int = {
+      millisToDays(date.getTime, defaultTimeZone())
+    }
+    outstandingTimezones.foreach { timeZone =>
+      withDefaultTimeZone(timeZone) {
+        Seq(
+          "0001-01-01",
+          "1000-01-01",
+          "1582-10-04",
+          "1582-10-15", // Gregorian cutover day
+          "1883-11-10", // America/Los_Angeles -7:52:58 zone offset
+          "1883-11-20", // America/Los_Angeles -08:00 zone offset
+          "1969-12-31",
+          "1970-01-01", // The epoch day
+          "2020-03-14").foreach { date =>
+          val julianDays = fromJavaDate(Date.valueOf(date))
+          val gregorianDays = localDateToDays(LocalDate.parse(date))
+
+          assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays)
+          assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays)
+        }
+      }
+    }
+  }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 3294655..f9b6139 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -37,6 +37,7 @@ import org.apache.parquet.schema.PrimitiveType;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
 
@@ -101,6 +102,7 @@ public class VectorizedColumnReader {
   // The timezone conversion to apply to int96 timestamps. Null if no conversion.
   private final ZoneId convertTz;
   private static final ZoneId UTC = ZoneOffset.UTC;
+  private final boolean rebaseDateTime;
 
   public VectorizedColumnReader(
       ColumnDescriptor descriptor,
@@ -129,6 +131,7 @@ public class VectorizedColumnReader {
     if (totalValueCount == 0) {
       throw new IOException("totalValueCount == 0");
     }
+    this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeEnabled();
   }
 
   /**
@@ -407,7 +410,7 @@ public class VectorizedColumnReader {
   private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
-    if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
+    if (column.dataType() == DataTypes.IntegerType ||
         DecimalType.is32BitDecimalType(column.dataType())) {
       defColumn.readIntegers(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
@@ -417,6 +420,21 @@ public class VectorizedColumnReader {
     } else if (column.dataType() == DataTypes.ShortType) {
       defColumn.readShorts(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+    } else if (column.dataType() == DataTypes.DateType ) {
+      if (rebaseDateTime) {
+        for (int i = 0; i < num; i++) {
+          if (defColumn.readInteger() == maxDefLevel) {
+            column.putInt(
+              rowId + i,
+              DateTimeUtils.rebaseJulianToGregorianDays(dataColumn.readInteger()));
+          } else {
+            column.putNull(rowId + i);
+          }
+        }
+      } else {
+        defColumn.readIntegers(
+           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+      }
     } else {
       throw constructConvertNotSupportedException(descriptor, column);
     }
@@ -425,14 +443,32 @@ public class VectorizedColumnReader {
   private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
     // This is where we implement support for the valid type conversions.
     if (column.dataType() == DataTypes.LongType ||
-        DecimalType.is64BitDecimalType(column.dataType()) ||
-        originalType == OriginalType.TIMESTAMP_MICROS) {
+        DecimalType.is64BitDecimalType(column.dataType())) {
       defColumn.readLongs(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+    } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
+      if (rebaseDateTime) {
+        for (int i = 0; i < num; i++) {
+          if (defColumn.readInteger() == maxDefLevel) {
+            column.putLong(
+              rowId + i,
+              DateTimeUtils.rebaseJulianToGregorianMicros(dataColumn.readLong()));
+          } else {
+            column.putNull(rowId + i);
+          }
+        }
+      } else {
+        defColumn.readLongs(
+          num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+      }
     } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
       for (int i = 0; i < num; i++) {
         if (defColumn.readInteger() == maxDefLevel) {
-          column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
+          long micros = DateTimeUtils.fromMillis(dataColumn.readLong());
+          if (rebaseDateTime) {
+            micros = DateTimeUtils.rebaseJulianToGregorianMicros(micros);
+          }
+          column.putLong(rowId + i, micros);
         } else {
           column.putNull(rowId + i);
         }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 22422c0..85a0d43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -130,6 +130,9 @@ private[parquet] class ParquetRowConverter(
     updater: ParentContainerUpdater)
   extends ParquetGroupConverter(updater) with Logging {
 
+  // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
+  private val rebaseDateTime = SQLConf.get.parquetRebaseDateTimeEnabled
+
   assert(
     parquetType.getFieldCount <= catalystType.length,
     s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
@@ -271,16 +274,35 @@ private[parquet] class ParquetRowConverter(
         new ParquetStringConverter(updater)
 
       case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
-        new ParquetPrimitiveConverter(updater) {
-          override def addLong(value: Long): Unit = {
-            updater.setLong(value)
+        if (rebaseDateTime) {
+          new ParquetPrimitiveConverter(updater) {
+            override def addLong(value: Long): Unit = {
+              val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(value)
+              updater.setLong(rebased)
+            }
+          }
+        } else {
+          new ParquetPrimitiveConverter(updater) {
+            override def addLong(value: Long): Unit = {
+              updater.setLong(value)
+            }
           }
         }
 
       case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
-        new ParquetPrimitiveConverter(updater) {
-          override def addLong(value: Long): Unit = {
-            updater.setLong(DateTimeUtils.fromMillis(value))
+        if (rebaseDateTime) {
+          new ParquetPrimitiveConverter(updater) {
+            override def addLong(value: Long): Unit = {
+              val micros = DateTimeUtils.fromMillis(value)
+              val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+              updater.setLong(rebased)
+            }
+          }
+        } else {
+          new ParquetPrimitiveConverter(updater) {
+            override def addLong(value: Long): Unit = {
+              updater.setLong(DateTimeUtils.fromMillis(value))
+            }
           }
         }
 
@@ -305,10 +327,17 @@ private[parquet] class ParquetRowConverter(
         }
 
       case DateType =>
-        new ParquetPrimitiveConverter(updater) {
-          override def addInt(value: Int): Unit = {
-            // DateType is not specialized in `SpecificMutableRow`, have to box it here.
-            updater.set(value.asInstanceOf[DateType#InternalType])
+        if (rebaseDateTime) {
+          new ParquetPrimitiveConverter(updater) {
+            override def addInt(value: Int): Unit = {
+              updater.set(DateTimeUtils.rebaseJulianToGregorianDays(value))
+            }
+          }
+        } else {
+          new ParquetPrimitiveConverter(updater) {
+            override def addInt(value: Int): Unit = {
+              updater.set(value)
+            }
           }
         }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index f649061..8904b35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -77,6 +77,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
   private val decimalBuffer =
     new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION))
 
+  // Whether to rebase datetimes from Gregorian to Julian calendar in write
+  private val rebaseDateTime: Boolean = SQLConf.get.parquetRebaseDateTimeEnabled
+
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
     this.schema = StructType.fromString(schemaString)
@@ -147,6 +150,11 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
         (row: SpecializedGetters, ordinal: Int) =>
           recordConsumer.addInteger(row.getShort(ordinal))
 
+      case DateType if rebaseDateTime =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          val rebasedDays = DateTimeUtils.rebaseGregorianToJulianDays(row.getInt(ordinal))
+          recordConsumer.addInteger(rebasedDays)
+
       case IntegerType | DateType =>
         (row: SpecializedGetters, ordinal: Int) =>
           recordConsumer.addInteger(row.getInt(ordinal))
@@ -177,10 +185,21 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
               buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
               recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
 
+          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if rebaseDateTime =>
+            (row: SpecializedGetters, ordinal: Int) =>
+              val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(row.getLong(ordinal))
+              recordConsumer.addLong(rebasedMicros)
+
           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
             (row: SpecializedGetters, ordinal: Int) =>
               recordConsumer.addLong(row.getLong(ordinal))
 
+          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if rebaseDateTime =>
+            (row: SpecializedGetters, ordinal: Int) =>
+              val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(row.getLong(ordinal))
+              val millis = DateTimeUtils.toMillis(rebasedMicros)
+              recordConsumer.addLong(millis)
+
           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
             (row: SpecializedGetters, ordinal: Int) =>
               val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet
new file mode 100644
index 0000000..7d5cc12
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet differ
diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet
new file mode 100644
index 0000000..13254bd
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet differ
diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet
new file mode 100644
index 0000000..7d2b46e
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet differ
diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet
new file mode 100644
index 0000000..e982545
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 1550b3b..7f85fd2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.sql.{Date, Timestamp}
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -879,6 +880,71 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
       assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
     }
   }
+
+  test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") {
+    Seq(false, true).foreach { vectorized =>
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+        withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
+          checkAnswer(
+            readResourceParquetFile("test-data/before_1582_date_v2_4.snappy.parquet"),
+            Row(java.sql.Date.valueOf("1001-01-01")))
+          checkAnswer(readResourceParquetFile(
+            "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet"),
+            Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+          checkAnswer(readResourceParquetFile(
+            "test-data/before_1582_timestamp_millis_v2_4.snappy.parquet"),
+            Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123")))
+        }
+        checkAnswer(readResourceParquetFile(
+          "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
+          Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+      }
+    }
+  }
+
+  test("SPARK-31159: rebasing timestamps in write") {
+    Seq(
+      ("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"),
+      ("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"),
+      ("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456")
+    ).foreach { case (outType, tsStr, nonRebased) =>
+      withClue(s"output type $outType") {
+        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
+          withTempPath { dir =>
+            val path = dir.getAbsolutePath
+            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
+              Seq(tsStr).toDF("tsS")
+                .select($"tsS".cast("timestamp").as("ts"))
+                .write
+                .parquet(path)
+
+              checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr)))
+            }
+            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
+              checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(nonRebased)))
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-31159: rebasing dates in write") {
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
+        Seq("1001-01-01").toDF("dateS")
+          .select($"dateS".cast("date").as("date"))
+          .write
+          .parquet(path)
+
+        checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01")))
+      }
+      withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
+        checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07")))
+      }
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org