You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/05/10 04:34:02 UTC
[spark] branch master updated: [SPARK-31662][SQL] Fix loading of
dates before 1582-10-15 from dictionary encoded Parquet columns
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ce63bef [SPARK-31662][SQL] Fix loading of dates before 1582-10-15 from dictionary encoded Parquet columns
ce63bef is described below
commit ce63bef1dac91f82cda5ebb47b38bd98eaf8164f
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Sun May 10 13:31:26 2020 +0900
[SPARK-31662][SQL] Fix loading of dates before 1582-10-15 from dictionary encoded Parquet columns
### What changes were proposed in this pull request?
Modified the `decodeDictionaryIds()` method `VectorizedColumnReader` to handle especially the `DateType` when passed parameter `rebaseDateTime` is true. In that case, decoded days are rebased from the hybrid calendar to Proleptic Gregorian calendar using `RebaseDateTime`.`rebaseJulianToGregorianDays()`.
### Why are the changes needed?
This fixes the bug of loading dates before the cutover day from dictionary encoded column in parquet files. The code below forces dictionary encoding:
```scala
spark.conf.set("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled", true)
Seq.tabulate(8)(_ => "1001-01-01").toDF("dateS")
.select($"dateS".cast("date").as("date")).repartition(1)
.write
.option("parquet.enable.dictionary", true)
.parquet(path)
```
Load the dates back:
```scala
spark.read.parquet(path).show(false)
+----------+
|date |
+----------+
|1001-01-07|
...
|1001-01-07|
+----------+
```
Expected values **must be 1000-01-01** but not 1001-01-07.
### Does this PR introduce _any_ user-facing change?
Yes. After the changes:
```scala
spark.read.parquet(path).show(false)
+----------+
|date |
+----------+
|1001-01-01|
...
|1001-01-01|
+----------+
```
### How was this patch tested?
Modified the test `SPARK-31159: rebasing dates in write` in `ParquetIOSuite` to checked reading dictionary encoded dates.
Closes #28479 from MaxGekk/fix-datetime-rebase-parquet-dict-enc.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../parquet/VectorizedColumnReader.java | 37 ++++++++++++----
.../datasources/parquet/ParquetIOSuite.scala | 49 +++++++++++++---------
2 files changed, 58 insertions(+), 28 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 7ae60f2..6c8587c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -152,6 +152,24 @@ public class VectorizedColumnReader {
return definitionLevelColumn.nextInt() == maxDefLevel;
}
+ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
+ boolean isSupported = false;
+ switch (typeName) {
+ case INT32:
+ isSupported = originalType != OriginalType.DATE || !rebaseDateTime;
+ break;
+ case INT64:
+ isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
+ break;
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ isSupported = true;
+ break;
+ }
+ return isSupported;
+ }
+
/**
* Reads `total` values from this columnReader into column.
*/
@@ -181,13 +199,7 @@ public class VectorizedColumnReader {
// TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process
// the values to add microseconds precision.
- if (column.hasDictionary() || (rowId == 0 &&
- (typeName == PrimitiveType.PrimitiveTypeName.INT32 ||
- (typeName == PrimitiveType.PrimitiveTypeName.INT64 &&
- originalType != OriginalType.TIMESTAMP_MILLIS) ||
- typeName == PrimitiveType.PrimitiveTypeName.FLOAT ||
- typeName == PrimitiveType.PrimitiveTypeName.DOUBLE ||
- typeName == PrimitiveType.PrimitiveTypeName.BINARY))) {
+ if (column.hasDictionary() || (rowId == 0 && isLazyDecodingSupported(typeName))) {
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
// non-dictionary encoded values have already been added).
@@ -266,7 +278,8 @@ public class VectorizedColumnReader {
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
if (column.dataType() == DataTypes.IntegerType ||
- DecimalType.is32BitDecimalType(column.dataType())) {
+ DecimalType.is32BitDecimalType(column.dataType()) ||
+ (column.dataType() == DataTypes.DateType && !rebaseDateTime)) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
@@ -284,6 +297,14 @@ public class VectorizedColumnReader {
column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
}
}
+ } else if (column.dataType() == DataTypes.DateType) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ if (!column.isNullAt(i)) {
+ int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i));
+ int gregorianDays = RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
+ column.putInt(i, gregorianDays);
+ }
+ }
} else {
throw constructConvertNotSupportedException(descriptor, column);
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index af66aa0..198b68b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -978,29 +978,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
test("SPARK-31159: rebasing dates in write") {
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
- Seq("1001-01-01").toDF("dateS")
- .select($"dateS".cast("date").as("date"))
- .write
- .parquet(path)
- }
+ val N = 8
+ Seq(false, true).foreach { dictionaryEncoding =>
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+ Seq.tabulate(N)(_ => "1001-01-01").toDF("dateS")
+ .select($"dateS".cast("date").as("date"))
+ .repartition(1)
+ .write
+ .option("parquet.enable.dictionary", dictionaryEncoding)
+ .parquet(path)
+ }
- Seq(false, true).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
- // The file metadata indicates if it needs rebase or not, so we can always get the correct
- // result regardless of the "rebaseInRead" config.
- Seq(true, false).foreach { rebase =>
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
- checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01")))
+ Seq(false, true).foreach { vectorized =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+ // The file metadata indicates if it needs rebase or not, so we can always get
+ // the correct result regardless of the "rebaseInRead" config.
+ Seq(true, false).foreach { rebase =>
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+ checkAnswer(
+ spark.read.parquet(path),
+ Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
+ }
}
- }
- // Force to not rebase to prove the written datetime values are rebased and we will get
- // wrong result if we don't rebase while reading.
- withSQLConf("spark.test.forceNoRebase" -> "true") {
- checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07")))
+ // Force to not rebase to prove the written datetime values are rebased and we will get
+ // wrong result if we don't rebase while reading.
+ withSQLConf("spark.test.forceNoRebase" -> "true") {
+ checkAnswer(
+ spark.read.parquet(path),
+ Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07"))))
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org