You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/11 05:03:12 UTC

[spark] branch branch-3.0 updated: [SPARK-31672][SQL] Fix loading of timestamps before 1582-10-15 from dictionary encoded Parquet columns

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5c6a4fc  [SPARK-31672][SQL] Fix loading of timestamps before 1582-10-15 from dictionary encoded Parquet columns
5c6a4fc is described below

commit 5c6a4fc8a71fcca9110c8c18ebd44d935514fcc1
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon May 11 04:58:08 2020 +0000

    [SPARK-31672][SQL] Fix loading of timestamps before 1582-10-15 from dictionary encoded Parquet columns
    
    Modified the `decodeDictionaryIds()` method of `VectorizedColumnReader` to handle especially `TimestampType` when the passed parameter `rebaseDateTime` is true. In that case, decoded milliseconds/microseconds are rebased from the hybrid calendar to Proleptic Gregorian calendar using `RebaseDateTime`.`rebaseJulianToGregorianMicros()`.
    
    This fixes the bug of loading timestamps before the cutover day from dictionary encoded column in parquet files. The code below forces dictionary encoding:
    ```scala
    spark.conf.set("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled", true)
    scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
    scala>
    Seq.tabulate(8)(_ => "1001-01-01 01:02:03.123").toDF("tsS")
      .select($"tsS".cast("timestamp").as("ts")).repartition(1)
      .write
      .option("parquet.enable.dictionary", true)
      .parquet(path)
    ```
    Load the dates back:
    ```scala
    scala> spark.read.parquet(path).show(false)
    +-----------------------+
    |ts                     |
    +-----------------------+
    |1001-01-07 00:32:20.123|
    ...
    |1001-01-07 00:32:20.123|
    +-----------------------+
    ```
    Expected values **must be 1001-01-01 01:02:03.123** but not 1001-01-07 00:32:20.123.
    
    Yes. After the changes:
    ```scala
    scala> spark.read.parquet(path).show(false)
    +-----------------------+
    |ts                     |
    +-----------------------+
    |1001-01-01 01:02:03.123|
    ...
    |1001-01-01 01:02:03.123|
    +-----------------------+
    ```
    
    Modified the test `SPARK-31159: rebasing timestamps in write` in `ParquetIOSuite` to checked reading dictionary encoded dates.
    
    Closes #28489 from MaxGekk/fix-ts-rebase-parquet-dict-enc.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 5d5866be12259c40972f7404f64d830cab87401f)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../parquet/VectorizedColumnReader.java            | 31 +++++++++--
 .../datasources/parquet/ParquetIOSuite.scala       | 65 ++++++++++++----------
 2 files changed, 64 insertions(+), 32 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 03056f5..11ce11d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -159,7 +159,11 @@ public class VectorizedColumnReader {
         isSupported = originalType != OriginalType.DATE || !rebaseDateTime;
         break;
       case INT64:
-        isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
+        if (originalType == OriginalType.TIMESTAMP_MICROS) {
+          isSupported = !rebaseDateTime;
+        } else {
+          isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
+        }
         break;
       case FLOAT:
       case DOUBLE:
@@ -313,17 +317,36 @@ public class VectorizedColumnReader {
       case INT64:
         if (column.dataType() == DataTypes.LongType ||
             DecimalType.is64BitDecimalType(column.dataType()) ||
-            originalType == OriginalType.TIMESTAMP_MICROS) {
+            (originalType == OriginalType.TIMESTAMP_MICROS && !rebaseDateTime)) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
             }
           }
         } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
+          if (rebaseDateTime) {
+            for (int i = rowId; i < rowId + num; ++i) {
+              if (!column.isNullAt(i)) {
+                long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
+                long julianMicros = DateTimeUtils.fromMillis(julianMillis);
+                long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
+                column.putLong(i, gregorianMicros);
+              }
+            }
+          } else {
+            for (int i = rowId; i < rowId + num; ++i) {
+              if (!column.isNullAt(i)) {
+                long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
+                column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis));
+              }
+            }
+          }
+        } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
-              column.putLong(i,
-                DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
+              long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
+              long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
+              column.putLong(i, gregorianMicros);
             }
           }
         } else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 198b68b..cf2c7c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -937,37 +937,46 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
   }
 
   test("SPARK-31159: rebasing timestamps in write") {
-    Seq(
-      ("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"),
-      ("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"),
-      ("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456")
-    ).foreach { case (outType, tsStr, nonRebased) =>
-      withClue(s"output type $outType") {
-        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
-          withTempPath { dir =>
-            val path = dir.getAbsolutePath
-            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
-              Seq(tsStr).toDF("tsS")
-                .select($"tsS".cast("timestamp").as("ts"))
-                .write
-                .parquet(path)
-            }
+    val N = 8
+    Seq(false, true).foreach { dictionaryEncoding =>
+      Seq(
+        ("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"),
+        ("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"),
+        ("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456")
+      ).foreach { case (outType, tsStr, nonRebased) =>
+        withClue(s"output type $outType") {
+          withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
+            withTempPath { dir =>
+              val path = dir.getAbsolutePath
+              withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+                Seq.tabulate(N)(_ => tsStr).toDF("tsS")
+                  .select($"tsS".cast("timestamp").as("ts"))
+                  .repartition(1)
+                  .write
+                  .option("parquet.enable.dictionary", dictionaryEncoding)
+                  .parquet(path)
+              }
 
-            Seq(false, true).foreach { vectorized =>
-              withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
-                // The file metadata indicates if it needs rebase or not, so we can always get the
-                // correct result regardless of the "rebaseInRead" config.
-                Seq(true, false).foreach { rebase =>
-                  withSQLConf(
-                    SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
-                    checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr)))
+              Seq(false, true).foreach { vectorized =>
+                withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+                  // The file metadata indicates if it needs rebase or not, so we can always get the
+                  // correct result regardless of the "rebaseInRead" config.
+                  Seq(true, false).foreach { rebase =>
+                    withSQLConf(
+                      SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+                      checkAnswer(
+                        spark.read.parquet(path),
+                        Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
+                    }
                   }
-                }
 
-                // Force to not rebase to prove the written datetime values are rebased
-                // and we will get wrong result if we don't rebase while reading.
-                withSQLConf("spark.test.forceNoRebase" -> "true") {
-                  checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(nonRebased)))
+                  // Force to not rebase to prove the written datetime values are rebased
+                  // and we will get wrong result if we don't rebase while reading.
+                  withSQLConf("spark.test.forceNoRebase" -> "true") {
+                    checkAnswer(
+                      spark.read.parquet(path),
+                      Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased))))
+                  }
                 }
               }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org