You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/19 06:47:40 UTC
[spark] branch branch-3.0 updated: [SPARK-31159][SQL] Rebase
date/timestamp from/to Julian calendar in parquet
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d712a7a [SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet
d712a7a is described below
commit d712a7aeef4606a2ab3102b04545cde218eb712a
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Thu Mar 19 12:49:51 2020 +0800
[SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet
The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing dates and timestamp via Parquet datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for dates/timestamps before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is [...]
- -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into parquet.
- -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value.
According to the parquet spec, parquet timestamps of the `TIMESTAMP_MILLIS`, `TIMESTAMP_MICROS` output type and parquet dates should be based on Proleptic Gregorian calendar but the `INT96` timestamps should be stored as Julian days. Since the version 3.0, Spark conforms the spec but for the backward compatibility with previous version, the PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid one under the SQL config:
```
spark.sql.legacy.parquet.rebaseDateTime.enabled
```
which is set to `false` by default which means the rebasing is not performed by default.
The details of the implementation:
1. Added 2 methods to `DateTimeUtils` for rebasing microseconds. `rebaseGregorianToJulianMicros()` builds a local timestamp in Proleptic Gregorian calendar, extracts date-time fields `year`, `month`, ..., `second fraction` from the local timestamp and uses them to build another local timestamp based on the hybrid calendar (using `java.util.Calendar` API). After that it calculates the number of microseconds since the epoch using the resulted local timestamp. The function performs the c [...]
2. Added 2 methods to `DateTimeUtils` for rebasing days. `rebaseGregorianToJulianDays()` builds a local date from the passed number of days since the epoch in Proleptic Gregorian calendar, interprets the resulted date as a local date in the hybrid calendar and gets the number of days since the epoch from the resulted local date. The conversion is performed via the `UTC` time zone because the conversion is independent from time zones, and `UTC` is selected to void round issues of casti [...]
3. Use `rebaseGregorianToJulianMicros()` and `rebaseGregorianToJulianDays()` while saving timestamps/dates to parquet files if the SQL config is on.
4. Use `rebaseJulianToGregorianMicros()` and `rebaseJulianToGregorianDays()` while loading timestamps/dates from parquet files if the SQL config is on.
5. The SQL config `spark.sql.legacy.parquet.rebaseDateTime.enabled` controls conversions from/to dates, timestamps of `TIMESTAMP_MILLIS`, `TIMESTAMP_MICROS`, see the SQL config `spark.sql.parquet.outputTimestampType`.
6. The rebasing is always performed for `INT96` timestamps, independently from `spark.sql.legacy.parquet.rebaseDateTime.enabled`.
7. Supported the vectorized parquet reader, see the SQL config `spark.sql.parquet.enableVectorizedReader`.
- For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. Also after the changes, users can enable the rebasing in write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and earlier versions.
- It fixes the bug of incorrect saving/loading timestamps of the `INT96` type
Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as `TIMESTAMP_MICROS` is interpreted by Spark 3.0.0-preview2 differently:
```scala
scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros").show(false)
+--------------------------+
|ts |
+--------------------------+
|1001-01-07 11:32:20.123456|
+--------------------------+
```
After the changes:
```scala
scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true)
scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros").show(false)
+--------------------------+
|ts |
+--------------------------+
|1001-01-01 01:02:03.123456|
+--------------------------+
```
1. Added tests to `ParquetIOSuite` to check rebasing in read for regular reader and vectorized parquet reader. The test reads back parquet files saved by Spark 2.4.5 via:
```shell
$ export TZ="America/Los_Angeles"
```
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> val df = Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date"))
df: org.apache.spark.sql.DataFrame = [date: date]
scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_date")
scala> val df = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts"))
df: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros")
scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_millis")
scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "INT96")
scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_int96")
```
2. Manually check the write code path. Save date/timestamps (TIMESTAMP_MICROS, TIMESTAMP_MILLIS, INT96) by Spark 3.1.0-SNAPSHOT (after the changes):
```bash
$ export TZ="America/Los_Angeles"
```
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true)
scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
scala> val df = Seq(("1001-01-01", "1001-01-01 01:02:03.123456")).toDF("dateS", "tsS").select($"dateS".cast("date").as("d"), $"tsS".cast("timestamp").as("ts"))
df: org.apache.spark.sql.DataFrame = [d: date, ts: timestamp]
scala> df.write.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros")
scala> spark.read.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros").show(false)
+----------+--------------------------+
|d |ts |
+----------+--------------------------+
|1001-01-01|1001-01-01 01:02:03.123456|
+----------+--------------------------+
```
Read the saved date/timestamp by Spark 2.4.5:
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> spark.read.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros").show(false)
+----------+--------------------------+
|d |ts |
+----------+--------------------------+
|1001-01-01|1001-01-01 01:02:03.123456|
+----------+--------------------------+
```
Closes #27915 from MaxGekk/rebase-parquet-datetime.
Authored-by: Maxim Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit bb295d80e37440e6ee67a00bc09df2e9ff6e4e46)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/util/DateTimeUtils.scala | 106 ++++++++++++++++++++-
.../org/apache/spark/sql/internal/SQLConf.scala | 15 +++
.../sql/catalyst/util/DateTimeUtilsSuite.scala | 60 ++++++++++++
.../parquet/VectorizedColumnReader.java | 44 ++++++++-
.../datasources/parquet/ParquetRowConverter.scala | 49 ++++++++--
.../datasources/parquet/ParquetWriteSupport.scala | 19 ++++
.../test-data/before_1582_date_v2_4.snappy.parquet | Bin 0 -> 398 bytes
...before_1582_timestamp_int96_v2_4.snappy.parquet | Bin 0 -> 494 bytes
...efore_1582_timestamp_micros_v2_4.snappy.parquet | Bin 0 -> 436 bytes
...efore_1582_timestamp_millis_v2_4.snappy.parquet | Bin 0 -> 436 bytes
.../datasources/parquet/ParquetIOSuite.scala | 66 +++++++++++++
11 files changed, 342 insertions(+), 17 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index a8c55e7..7f5babf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time._
import java.time.temporal.{ChronoField, ChronoUnit, IsoFields}
-import java.util.{Locale, TimeZone}
+import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit._
import scala.util.control.NonFatal
@@ -150,7 +150,9 @@ object DateTimeUtils {
def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = {
// use Long to avoid rounding errors
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY
- SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
+ val micros = SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
+ val rebased = rebaseJulianToGregorianMicros(micros)
+ rebased
}
/**
@@ -159,7 +161,7 @@ object DateTimeUtils {
* Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
*/
def toJulianDay(us: SQLTimestamp): (Int, Long) = {
- val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
+ val julian_us = rebaseGregorianToJulianMicros(us) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val day = julian_us / MICROS_PER_DAY
val micros = julian_us % MICROS_PER_DAY
(day.toInt, MICROSECONDS.toNanos(micros))
@@ -942,4 +944,102 @@ object DateTimeUtils {
val days = period.getDays
new CalendarInterval(months, days, 0)
}
+
+ /**
+ * Converts the given microseconds to a local date-time in UTC time zone in Proleptic Gregorian
+ * calendar, interprets the result as a local date-time in Julian calendar in UTC time zone.
+ * And takes microseconds since the epoch from the Julian timestamp.
+ *
+ * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
+ * @return The rebased microseconds since the epoch in Julian calendar.
+ */
+ def rebaseGregorianToJulianMicros(micros: Long): Long = {
+ val ldt = microsToInstant(micros).atZone(ZoneId.systemDefault).toLocalDateTime
+ val cal = new Calendar.Builder()
+ // `gregory` is a hybrid calendar that supports both
+ // the Julian and Gregorian calendar systems
+ .setCalendarType("gregory")
+ .setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
+ .setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
+ .build()
+ fromMillis(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND)
+ }
+
+ /**
+ * Converts the given microseconds to a local date-time in UTC time zone in Julian calendar,
+ * interprets the result as a local date-time in Proleptic Gregorian calendar in UTC time zone.
+ * And takes microseconds since the epoch from the Gregorian timestamp.
+ *
+ * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
+ * @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
+ */
+ def rebaseJulianToGregorianMicros(micros: Long): Long = {
+ val cal = new Calendar.Builder()
+ // `gregory` is a hybrid calendar that supports both
+ // the Julian and Gregorian calendar systems
+ .setCalendarType("gregory")
+ .setInstant(toMillis(micros))
+ .build()
+ val localDateTime = LocalDateTime.of(
+ cal.get(Calendar.YEAR),
+ cal.get(Calendar.MONTH) + 1,
+ cal.get(Calendar.DAY_OF_MONTH),
+ cal.get(Calendar.HOUR_OF_DAY),
+ cal.get(Calendar.MINUTE),
+ cal.get(Calendar.SECOND),
+ (Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
+ instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant)
+ }
+
+ /**
+ * Converts the given number of days since the epoch day 1970-01-01 to
+ * a local date in Julian calendar, interprets the result as a local
+ * date in Proleptic Gregorian calendar, and take the number of days
+ * since the epoch from the Gregorian date.
+ *
+ * @param days The number of days since the epoch in Julian calendar.
+ * @return The rebased number of days in Gregorian calendar.
+ */
+ def rebaseJulianToGregorianDays(days: Int): Int = {
+ val utcCal = new Calendar.Builder()
+ // `gregory` is a hybrid calendar that supports both
+ // the Julian and Gregorian calendar systems
+ .setCalendarType("gregory")
+ .setTimeZone(TimeZoneUTC)
+ .setInstant(Math.multiplyExact(days, MILLIS_PER_DAY))
+ .build()
+ val localDate = LocalDate.of(
+ utcCal.get(Calendar.YEAR),
+ utcCal.get(Calendar.MONTH) + 1,
+ utcCal.get(Calendar.DAY_OF_MONTH))
+ Math.toIntExact(localDate.toEpochDay)
+ }
+
+ /**
+ * Rebasing days since the epoch to store the same number of days
+ * as by Spark 2.4 and earlier versions. Spark 3.0 switched to
+ * Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
+ * this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
+ * Julian calendar for dates before 1582-10-15. So, the same local date may
+ * be mapped to different number of days since the epoch in different calendars.
+ *
+ * For example:
+ * Proleptic Gregorian calendar: 1582-01-01 -> -141714
+ * Julian calendar: 1582-01-01 -> -141704
+ * The code below converts -141714 to -141704.
+ *
+ * @param days The number of days since the epoch 1970-01-01. It can be negative.
+ * @return The rebased number of days since the epoch in Julian calendar.
+ */
+ def rebaseGregorianToJulianDays(days: Int): Int = {
+ val localDate = LocalDate.ofEpochDay(days)
+ val utcCal = new Calendar.Builder()
+ // `gregory` is a hybrid calendar that supports both
+ // the Julian and Gregorian calendar systems
+ .setCalendarType("gregory")
+ .setTimeZone(TimeZoneUTC)
+ .setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth)
+ .build()
+ Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, MILLIS_PER_DAY))
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1331350..a47f7d9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2232,6 +2232,19 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_PARQUET_REBASE_DATETIME =
+ buildConf("spark.sql.legacy.parquet.rebaseDateTime.enabled")
+ .internal()
+ .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
+ "to the hybrid calendar (Julian + Gregorian) in write and " +
+ "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
+ "The rebasing is performed by converting micros/millis/days to " +
+ "a local date/timestamp in the source calendar, interpreting the resulted date/" +
+ "timestamp in the target calendar, and getting the number of micros/millis/days " +
+ "since the epoch 1970-01-01 00:00:00Z.")
+ .booleanConf
+ .createWithDefault(false)
+
/**
* Holds information about keys that have been deprecated.
*
@@ -2807,6 +2820,8 @@ class SQLConf extends Serializable with Logging {
def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
+ def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 0df395b..f2ad9e6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -669,4 +669,64 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
assert(toDate("tomorrow CET ", zoneId).get === today + 1)
}
}
+
+ test("rebase julian to/from gregorian micros") {
+ outstandingTimezones.foreach { timeZone =>
+ withDefaultTimeZone(timeZone) {
+ Seq(
+ "0001-01-01 01:02:03.654321",
+ "1000-01-01 03:02:01.123456",
+ "1582-10-04 00:00:00.000000",
+ "1582-10-15 00:00:00.999999", // Gregorian cutover day
+ "1883-11-10 00:00:00.000000", // America/Los_Angeles -7:52:58 zone offset
+ "1883-11-20 00:00:00.000000", // America/Los_Angeles -08:00 zone offset
+ "1969-12-31 11:22:33.000100",
+ "1970-01-01 00:00:00.000001", // The epoch day
+ "2020-03-14 09:33:01.500000").foreach { ts =>
+ withClue(s"time zone = ${timeZone.getID} ts = $ts") {
+ val julianTs = Timestamp.valueOf(ts)
+ val julianMicros = fromMillis(julianTs.getTime) +
+ ((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
+ val gregorianMicros = instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T'))
+ .atZone(timeZone.toZoneId)
+ .toInstant)
+
+ assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros)
+ assert(rebaseGregorianToJulianMicros(gregorianMicros) === julianMicros)
+ }
+ }
+ }
+ }
+ }
+
+ test("rebase gregorian to/from julian days") {
+ // millisToDays() and fromJavaDate() are taken from Spark 2.4
+ def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = {
+ val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
+ Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
+ }
+ def fromJavaDate(date: Date): Int = {
+ millisToDays(date.getTime, defaultTimeZone())
+ }
+ outstandingTimezones.foreach { timeZone =>
+ withDefaultTimeZone(timeZone) {
+ Seq(
+ "0001-01-01",
+ "1000-01-01",
+ "1582-10-04",
+ "1582-10-15", // Gregorian cutover day
+ "1883-11-10", // America/Los_Angeles -7:52:58 zone offset
+ "1883-11-20", // America/Los_Angeles -08:00 zone offset
+ "1969-12-31",
+ "1970-01-01", // The epoch day
+ "2020-03-14").foreach { date =>
+ val julianDays = fromJavaDate(Date.valueOf(date))
+ val gregorianDays = localDateToDays(LocalDate.parse(date))
+
+ assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays)
+ assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays)
+ }
+ }
+ }
+ }
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 3294655..f9b6139 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -37,6 +37,7 @@ import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
@@ -101,6 +102,7 @@ public class VectorizedColumnReader {
// The timezone conversion to apply to int96 timestamps. Null if no conversion.
private final ZoneId convertTz;
private static final ZoneId UTC = ZoneOffset.UTC;
+ private final boolean rebaseDateTime;
public VectorizedColumnReader(
ColumnDescriptor descriptor,
@@ -129,6 +131,7 @@ public class VectorizedColumnReader {
if (totalValueCount == 0) {
throw new IOException("totalValueCount == 0");
}
+ this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeEnabled();
}
/**
@@ -407,7 +410,7 @@ public class VectorizedColumnReader {
private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
- if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
+ if (column.dataType() == DataTypes.IntegerType ||
DecimalType.is32BitDecimalType(column.dataType())) {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
@@ -417,6 +420,21 @@ public class VectorizedColumnReader {
} else if (column.dataType() == DataTypes.ShortType) {
defColumn.readShorts(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else if (column.dataType() == DataTypes.DateType ) {
+ if (rebaseDateTime) {
+ for (int i = 0; i < num; i++) {
+ if (defColumn.readInteger() == maxDefLevel) {
+ column.putInt(
+ rowId + i,
+ DateTimeUtils.rebaseJulianToGregorianDays(dataColumn.readInteger()));
+ } else {
+ column.putNull(rowId + i);
+ }
+ }
+ } else {
+ defColumn.readIntegers(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ }
} else {
throw constructConvertNotSupportedException(descriptor, column);
}
@@ -425,14 +443,32 @@ public class VectorizedColumnReader {
private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
- DecimalType.is64BitDecimalType(column.dataType()) ||
- originalType == OriginalType.TIMESTAMP_MICROS) {
+ DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
+ if (rebaseDateTime) {
+ for (int i = 0; i < num; i++) {
+ if (defColumn.readInteger() == maxDefLevel) {
+ column.putLong(
+ rowId + i,
+ DateTimeUtils.rebaseJulianToGregorianMicros(dataColumn.readLong()));
+ } else {
+ column.putNull(rowId + i);
+ }
+ }
+ } else {
+ defColumn.readLongs(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ }
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
- column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
+ long micros = DateTimeUtils.fromMillis(dataColumn.readLong());
+ if (rebaseDateTime) {
+ micros = DateTimeUtils.rebaseJulianToGregorianMicros(micros);
+ }
+ column.putLong(rowId + i, micros);
} else {
column.putNull(rowId + i);
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 22422c0..85a0d43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -130,6 +130,9 @@ private[parquet] class ParquetRowConverter(
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {
+ // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
+ private val rebaseDateTime = SQLConf.get.parquetRebaseDateTimeEnabled
+
assert(
parquetType.getFieldCount <= catalystType.length,
s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
@@ -271,16 +274,35 @@ private[parquet] class ParquetRowConverter(
new ParquetStringConverter(updater)
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
- new ParquetPrimitiveConverter(updater) {
- override def addLong(value: Long): Unit = {
- updater.setLong(value)
+ if (rebaseDateTime) {
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(value)
+ updater.setLong(rebased)
+ }
+ }
+ } else {
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ updater.setLong(value)
+ }
}
}
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
- new ParquetPrimitiveConverter(updater) {
- override def addLong(value: Long): Unit = {
- updater.setLong(DateTimeUtils.fromMillis(value))
+ if (rebaseDateTime) {
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ val micros = DateTimeUtils.fromMillis(value)
+ val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+ updater.setLong(rebased)
+ }
+ }
+ } else {
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ updater.setLong(DateTimeUtils.fromMillis(value))
+ }
}
}
@@ -305,10 +327,17 @@ private[parquet] class ParquetRowConverter(
}
case DateType =>
- new ParquetPrimitiveConverter(updater) {
- override def addInt(value: Int): Unit = {
- // DateType is not specialized in `SpecificMutableRow`, have to box it here.
- updater.set(value.asInstanceOf[DateType#InternalType])
+ if (rebaseDateTime) {
+ new ParquetPrimitiveConverter(updater) {
+ override def addInt(value: Int): Unit = {
+ updater.set(DateTimeUtils.rebaseJulianToGregorianDays(value))
+ }
+ }
+ } else {
+ new ParquetPrimitiveConverter(updater) {
+ override def addInt(value: Int): Unit = {
+ updater.set(value)
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index f649061..8904b35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -77,6 +77,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
private val decimalBuffer =
new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION))
+ // Whether to rebase datetimes from Gregorian to Julian calendar in write
+ private val rebaseDateTime: Boolean = SQLConf.get.parquetRebaseDateTimeEnabled
+
override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString)
@@ -147,6 +150,11 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addInteger(row.getShort(ordinal))
+ case DateType if rebaseDateTime =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val rebasedDays = DateTimeUtils.rebaseGregorianToJulianDays(row.getInt(ordinal))
+ recordConsumer.addInteger(rebasedDays)
+
case IntegerType | DateType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addInteger(row.getInt(ordinal))
@@ -177,10 +185,21 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
+ case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if rebaseDateTime =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(row.getLong(ordinal))
+ recordConsumer.addLong(rebasedMicros)
+
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))
+ case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if rebaseDateTime =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(row.getLong(ordinal))
+ val millis = DateTimeUtils.toMillis(rebasedMicros)
+ recordConsumer.addLong(millis)
+
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
(row: SpecializedGetters, ordinal: Int) =>
val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet
new file mode 100644
index 0000000..7d5cc12
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet differ
diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet
new file mode 100644
index 0000000..13254bd
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet differ
diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet
new file mode 100644
index 0000000..7d2b46e
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet differ
diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet
new file mode 100644
index 0000000..e982545
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 1550b3b..7f85fd2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.parquet
+import java.sql.{Date, Timestamp}
import java.util.Locale
import scala.collection.JavaConverters._
@@ -879,6 +880,71 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
}
}
+
+ test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") {
+ Seq(false, true).foreach { vectorized =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
+ checkAnswer(
+ readResourceParquetFile("test-data/before_1582_date_v2_4.snappy.parquet"),
+ Row(java.sql.Date.valueOf("1001-01-01")))
+ checkAnswer(readResourceParquetFile(
+ "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+ checkAnswer(readResourceParquetFile(
+ "test-data/before_1582_timestamp_millis_v2_4.snappy.parquet"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123")))
+ }
+ checkAnswer(readResourceParquetFile(
+ "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+ }
+ }
+ }
+
+ test("SPARK-31159: rebasing timestamps in write") {
+ Seq(
+ ("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"),
+ ("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"),
+ ("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456")
+ ).foreach { case (outType, tsStr, nonRebased) =>
+ withClue(s"output type $outType") {
+ withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
+ Seq(tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .write
+ .parquet(path)
+
+ checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr)))
+ }
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(nonRebased)))
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("SPARK-31159: rebasing dates in write") {
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
+ Seq("1001-01-01").toDF("dateS")
+ .select($"dateS".cast("date").as("date"))
+ .write
+ .parquet(path)
+
+ checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01")))
+ }
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07")))
+ }
+ }
+ }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org