You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/05/10 04:35:35 UTC

[spark] branch branch-3.0 updated: [SPARK-31662][SQL] Fix loading of dates before 1582-10-15 from dictionary encoded Parquet columns

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new eb278ba  [SPARK-31662][SQL] Fix loading of dates before 1582-10-15 from dictionary encoded Parquet columns
eb278ba is described below

commit eb278bad4cf1b5804584cdb4f6714c205149824e
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Sun May 10 13:31:26 2020 +0900

    [SPARK-31662][SQL] Fix loading of dates before 1582-10-15 from dictionary encoded Parquet columns
    
    ### What changes were proposed in this pull request?
    Modified the `decodeDictionaryIds()` method `VectorizedColumnReader` to handle especially the `DateType` when passed parameter `rebaseDateTime` is true. In that case, decoded days are rebased from the hybrid calendar to Proleptic Gregorian calendar using `RebaseDateTime`.`rebaseJulianToGregorianDays()`.
    
    ### Why are the changes needed?
    This fixes the bug of loading dates before the cutover day from dictionary encoded column in parquet files. The code below forces dictionary encoding:
    ```scala
    spark.conf.set("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled", true)
    Seq.tabulate(8)(_ => "1001-01-01").toDF("dateS")
      .select($"dateS".cast("date").as("date")).repartition(1)
      .write
      .option("parquet.enable.dictionary", true)
      .parquet(path)
    ```
    Load the dates back:
    ```scala
    spark.read.parquet(path).show(false)
    +----------+
    |date      |
    +----------+
    |1001-01-07|
    ...
    |1001-01-07|
    +----------+
    ```
    Expected values **must be 1000-01-01** but not 1001-01-07.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes:
    ```scala
    spark.read.parquet(path).show(false)
    +----------+
    |date      |
    +----------+
    |1001-01-01|
    ...
    |1001-01-01|
    +----------+
    ```
    
    ### How was this patch tested?
    Modified the test `SPARK-31159: rebasing dates in write` in `ParquetIOSuite` to checked reading dictionary encoded dates.
    
    Closes #28479 from MaxGekk/fix-datetime-rebase-parquet-dict-enc.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit ce63bef1dac91f82cda5ebb47b38bd98eaf8164f)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../parquet/VectorizedColumnReader.java            | 37 ++++++++++++----
 .../datasources/parquet/ParquetIOSuite.scala       | 49 +++++++++++++---------
 2 files changed, 58 insertions(+), 28 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 9482244..03056f5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -152,6 +152,24 @@ public class VectorizedColumnReader {
     return definitionLevelColumn.nextInt() == maxDefLevel;
   }
 
+  private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
+    boolean isSupported = false;
+    switch (typeName) {
+      case INT32:
+        isSupported = originalType != OriginalType.DATE || !rebaseDateTime;
+        break;
+      case INT64:
+        isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
+        break;
+      case FLOAT:
+      case DOUBLE:
+      case BINARY:
+        isSupported = true;
+        break;
+    }
+    return isSupported;
+  }
+
   /**
    * Reads `total` values from this columnReader into column.
    */
@@ -181,13 +199,7 @@ public class VectorizedColumnReader {
 
         // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process
         // the values to add microseconds precision.
-        if (column.hasDictionary() || (rowId == 0 &&
-            (typeName == PrimitiveType.PrimitiveTypeName.INT32 ||
-            (typeName == PrimitiveType.PrimitiveTypeName.INT64 &&
-              originalType != OriginalType.TIMESTAMP_MILLIS) ||
-            typeName == PrimitiveType.PrimitiveTypeName.FLOAT ||
-            typeName == PrimitiveType.PrimitiveTypeName.DOUBLE ||
-            typeName == PrimitiveType.PrimitiveTypeName.BINARY))) {
+        if (column.hasDictionary() || (rowId == 0 && isLazyDecodingSupported(typeName))) {
           // Column vector supports lazy decoding of dictionary values so just set the dictionary.
           // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
           // non-dictionary encoded values have already been added).
@@ -266,7 +278,8 @@ public class VectorizedColumnReader {
     switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
       case INT32:
         if (column.dataType() == DataTypes.IntegerType ||
-            DecimalType.is32BitDecimalType(column.dataType())) {
+            DecimalType.is32BitDecimalType(column.dataType()) ||
+            (column.dataType() == DataTypes.DateType && !rebaseDateTime)) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
@@ -284,6 +297,14 @@ public class VectorizedColumnReader {
               column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
             }
           }
+        } else if (column.dataType() == DataTypes.DateType) {
+          for (int i = rowId; i < rowId + num; ++i) {
+            if (!column.isNullAt(i)) {
+              int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i));
+              int gregorianDays = RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
+              column.putInt(i, gregorianDays);
+            }
+          }
         } else {
           throw constructConvertNotSupportedException(descriptor, column);
         }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index af66aa0..198b68b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -978,29 +978,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
   }
 
   test("SPARK-31159: rebasing dates in write") {
-    withTempPath { dir =>
-      val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
-        Seq("1001-01-01").toDF("dateS")
-          .select($"dateS".cast("date").as("date"))
-          .write
-          .parquet(path)
-      }
+    val N = 8
+    Seq(false, true).foreach { dictionaryEncoding =>
+      withTempPath { dir =>
+        val path = dir.getAbsolutePath
+        withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+          Seq.tabulate(N)(_ => "1001-01-01").toDF("dateS")
+            .select($"dateS".cast("date").as("date"))
+            .repartition(1)
+            .write
+            .option("parquet.enable.dictionary", dictionaryEncoding)
+            .parquet(path)
+        }
 
-      Seq(false, true).foreach { vectorized =>
-        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
-          // The file metadata indicates if it needs rebase or not, so we can always get the correct
-          // result regardless of the "rebaseInRead" config.
-          Seq(true, false).foreach { rebase =>
-            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
-              checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01")))
+        Seq(false, true).foreach { vectorized =>
+          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+            // The file metadata indicates if it needs rebase or not, so we can always get
+            // the correct result regardless of the "rebaseInRead" config.
+            Seq(true, false).foreach { rebase =>
+              withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+                checkAnswer(
+                  spark.read.parquet(path),
+                  Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
+              }
             }
-          }
 
-          // Force to not rebase to prove the written datetime values are rebased and we will get
-          // wrong result if we don't rebase while reading.
-          withSQLConf("spark.test.forceNoRebase" -> "true") {
-            checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07")))
+            // Force to not rebase to prove the written datetime values are rebased and we will get
+            // wrong result if we don't rebase while reading.
+            withSQLConf("spark.test.forceNoRebase" -> "true") {
+              checkAnswer(
+                spark.read.parquet(path),
+                Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07"))))
+            }
           }
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org