You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/11 05:03:12 UTC
[spark] branch branch-3.0 updated: [SPARK-31672][SQL] Fix loading
of timestamps before 1582-10-15 from dictionary encoded Parquet columns
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5c6a4fc [SPARK-31672][SQL] Fix loading of timestamps before 1582-10-15 from dictionary encoded Parquet columns
5c6a4fc is described below
commit 5c6a4fc8a71fcca9110c8c18ebd44d935514fcc1
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon May 11 04:58:08 2020 +0000
[SPARK-31672][SQL] Fix loading of timestamps before 1582-10-15 from dictionary encoded Parquet columns
Modified the `decodeDictionaryIds()` method of `VectorizedColumnReader` to handle especially `TimestampType` when the passed parameter `rebaseDateTime` is true. In that case, decoded milliseconds/microseconds are rebased from the hybrid calendar to Proleptic Gregorian calendar using `RebaseDateTime`.`rebaseJulianToGregorianMicros()`.
This fixes the bug of loading timestamps before the cutover day from dictionary encoded column in parquet files. The code below forces dictionary encoding:
```scala
spark.conf.set("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled", true)
scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
scala>
Seq.tabulate(8)(_ => "1001-01-01 01:02:03.123").toDF("tsS")
.select($"tsS".cast("timestamp").as("ts")).repartition(1)
.write
.option("parquet.enable.dictionary", true)
.parquet(path)
```
Load the dates back:
```scala
scala> spark.read.parquet(path).show(false)
+-----------------------+
|ts |
+-----------------------+
|1001-01-07 00:32:20.123|
...
|1001-01-07 00:32:20.123|
+-----------------------+
```
Expected values **must be 1001-01-01 01:02:03.123** but not 1001-01-07 00:32:20.123.
Yes. After the changes:
```scala
scala> spark.read.parquet(path).show(false)
+-----------------------+
|ts |
+-----------------------+
|1001-01-01 01:02:03.123|
...
|1001-01-01 01:02:03.123|
+-----------------------+
```
Modified the test `SPARK-31159: rebasing timestamps in write` in `ParquetIOSuite` to checked reading dictionary encoded dates.
Closes #28489 from MaxGekk/fix-ts-rebase-parquet-dict-enc.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 5d5866be12259c40972f7404f64d830cab87401f)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../parquet/VectorizedColumnReader.java | 31 +++++++++--
.../datasources/parquet/ParquetIOSuite.scala | 65 ++++++++++++----------
2 files changed, 64 insertions(+), 32 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 03056f5..11ce11d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -159,7 +159,11 @@ public class VectorizedColumnReader {
isSupported = originalType != OriginalType.DATE || !rebaseDateTime;
break;
case INT64:
- isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
+ if (originalType == OriginalType.TIMESTAMP_MICROS) {
+ isSupported = !rebaseDateTime;
+ } else {
+ isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
+ }
break;
case FLOAT:
case DOUBLE:
@@ -313,17 +317,36 @@ public class VectorizedColumnReader {
case INT64:
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType()) ||
- originalType == OriginalType.TIMESTAMP_MICROS) {
+ (originalType == OriginalType.TIMESTAMP_MICROS && !rebaseDateTime)) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
+ if (rebaseDateTime) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ if (!column.isNullAt(i)) {
+ long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
+ long julianMicros = DateTimeUtils.fromMillis(julianMillis);
+ long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
+ column.putLong(i, gregorianMicros);
+ }
+ }
+ } else {
+ for (int i = rowId; i < rowId + num; ++i) {
+ if (!column.isNullAt(i)) {
+ long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
+ column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis));
+ }
+ }
+ }
+ } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
- column.putLong(i,
- DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
+ long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
+ long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
+ column.putLong(i, gregorianMicros);
}
}
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 198b68b..cf2c7c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -937,37 +937,46 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
test("SPARK-31159: rebasing timestamps in write") {
- Seq(
- ("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"),
- ("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"),
- ("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456")
- ).foreach { case (outType, tsStr, nonRebased) =>
- withClue(s"output type $outType") {
- withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
- Seq(tsStr).toDF("tsS")
- .select($"tsS".cast("timestamp").as("ts"))
- .write
- .parquet(path)
- }
+ val N = 8
+ Seq(false, true).foreach { dictionaryEncoding =>
+ Seq(
+ ("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"),
+ ("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"),
+ ("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456")
+ ).foreach { case (outType, tsStr, nonRebased) =>
+ withClue(s"output type $outType") {
+ withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+ Seq.tabulate(N)(_ => tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .repartition(1)
+ .write
+ .option("parquet.enable.dictionary", dictionaryEncoding)
+ .parquet(path)
+ }
- Seq(false, true).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
- // The file metadata indicates if it needs rebase or not, so we can always get the
- // correct result regardless of the "rebaseInRead" config.
- Seq(true, false).foreach { rebase =>
- withSQLConf(
- SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
- checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr)))
+ Seq(false, true).foreach { vectorized =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+ // The file metadata indicates if it needs rebase or not, so we can always get the
+ // correct result regardless of the "rebaseInRead" config.
+ Seq(true, false).foreach { rebase =>
+ withSQLConf(
+ SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+ checkAnswer(
+ spark.read.parquet(path),
+ Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
+ }
}
- }
- // Force to not rebase to prove the written datetime values are rebased
- // and we will get wrong result if we don't rebase while reading.
- withSQLConf("spark.test.forceNoRebase" -> "true") {
- checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(nonRebased)))
+ // Force to not rebase to prove the written datetime values are rebased
+ // and we will get wrong result if we don't rebase while reading.
+ withSQLConf("spark.test.forceNoRebase" -> "true") {
+ checkAnswer(
+ spark.read.parquet(path),
+ Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased))))
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org