You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2024/03/20 06:05:09 UTC

(spark) branch master updated: [SPARK-47447][SQL] Allow reading Parquet TimestampLTZ as TimestampNTZ

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c3a04fa59ce1 [SPARK-47447][SQL] Allow reading Parquet TimestampLTZ as TimestampNTZ
c3a04fa59ce1 is described below

commit c3a04fa59ce1aabe4818430ae294fb8d210c0e4b
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Tue Mar 19 23:04:59 2024 -0700

    [SPARK-47447][SQL] Allow reading Parquet TimestampLTZ as TimestampNTZ
    
    ### What changes were proposed in this pull request?
    
    Currently, Parquet TimestampNTZ type columns can be read as TimestampLTZ, while reading TimestampLTZ as TimestampNTZ will cause errors. This makes it impossible to read parquet files containing both TimestampLTZ and TimestampNTZ as TimestampNTZ.
    
    To make the data type system on Parquet simpler, this PR allows reading TimestampLTZ as TimestampNTZ in the Parquet data source.
    
    ### Why are the changes needed?
    
    * Make it possible  to read parquet files containing both TimestampLTZ and TimestampNTZ as TimestampNTZ
    * Make the data type system on Parquet simpler
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Parquet TimestampLTZ type column are now allowed to be read as TimestampNTZ
    
    ### How was this patch tested?
    
    UT
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45571 from gengliangwang/allowReadLTZAsNTZ.
    
    Authored-by: Gengliang Wang <ge...@apache.org>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../parquet/ParquetVectorUpdaterFactory.java       | 19 ++-----------------
 .../datasources/parquet/ParquetRowConverter.scala  | 16 ++++++++++++----
 .../datasources/parquet/ParquetQuerySuite.scala    | 22 +++++++---------------
 3 files changed, 21 insertions(+), 36 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index abb44915cbcd..b6065c24f2ec 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -148,12 +148,10 @@ public class ParquetVectorUpdaterFactory {
           }
         } else if (sparkType == DataTypes.TimestampNTZType &&
           isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
-          validateTimestampNTZType();
           // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase.
           return new LongUpdater();
         } else if (sparkType == DataTypes.TimestampNTZType &&
           isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
-          validateTimestampNTZType();
           // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase.
           return new LongAsMicrosUpdater();
         } else if (sparkType instanceof DayTimeIntervalType) {
@@ -176,7 +174,8 @@ public class ParquetVectorUpdaterFactory {
       }
       case INT96 -> {
         if (sparkType == DataTypes.TimestampNTZType) {
-          convertErrorForTimestampNTZ(typeName.name());
+          // TimestampNTZ type does not require rebasing due to its lack of time zone context.
+          return new BinaryToSQLTimestampUpdater();
         } else if (sparkType == DataTypes.TimestampType) {
           final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode);
           if (!shouldConvertTimestamps()) {
@@ -232,20 +231,6 @@ public class ParquetVectorUpdaterFactory {
       annotation.getUnit() == unit;
   }
 
-  private void validateTimestampNTZType() {
-    assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation);
-    // Throw an exception if the Parquet type is TimestampLTZ as the Catalyst type is TimestampNTZ.
-    // This is to avoid mistakes in reading the timestamp values.
-    if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC()) {
-      convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")");
-    }
-  }
-
-  void convertErrorForTimestampNTZ(String parquetType) {
-    throw new RuntimeException("Unable to create Parquet converter for data type " +
-      DataTypes.TimestampNTZType.json() + " whose Parquet type is " + parquetType);
-  }
-
   boolean isUnsignedIntTypeMatched(int bitWidth) {
     return logicalTypeAnnotation instanceof IntLogicalTypeAnnotation annotation &&
       !annotation.isSigned() && annotation.getBitWidth() == bitWidth;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 1f4522aef2bb..3f5754f27ae3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -436,6 +436,17 @@ private[parquet] class ParquetRowConverter(
           }
         }
 
+      // INT96 timestamp doesn't have a logical type, here we check the physical type instead.
+      case TimestampNTZType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
+        new ParquetPrimitiveConverter(updater) {
+          // Converts nanosecond timestamps stored as INT96.
+          // TimestampNTZ type does not require rebasing due to its lack of time zone context.
+          override def addBinary(value: Binary): Unit = {
+            val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value)
+            this.updater.setLong(julianMicros)
+          }
+        }
+
       case TimestampNTZType
         if canReadAsTimestampNTZ(parquetType) &&
           parquetType.getLogicalTypeAnnotation
@@ -536,10 +547,7 @@ private[parquet] class ParquetRowConverter(
   // can be read as Spark's TimestampNTZ type. This is to avoid mistakes in reading the timestamp
   // values.
   private def canReadAsTimestampNTZ(parquetType: Type): Boolean =
-    parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 &&
-    parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
-    !parquetType.getLogicalTypeAnnotation
-      .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC
+    parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation]
 
   /**
    * Parquet converter for strings. A dictionary is used to minimize string decoding cost.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 02937ae0fea8..26641cd18d9c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -185,29 +185,21 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
     }
   }
 
-  test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
-    val data = (1 to 1000).map { i =>
-      val ts = new java.sql.Timestamp(i)
-      Row(ts)
-    }
-    val actualSchema = StructType(Seq(StructField("time", TimestampType, false)))
+  test("SPARK-47447: read TimestampLTZ as TimestampNTZ") {
     val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))
 
     Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
       Seq(true, false).foreach { dictionaryEnabled =>
         withSQLConf(
-            SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType,
-            ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) {
+          SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType,
+          ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString,
+          SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") {
           withTempPath { file =>
-            val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema)
+            val df = sql("select timestamp'2021-02-02 16:00:00' as time")
             df.write.parquet(file.getCanonicalPath)
             withAllParquetReaders {
-              val e = intercept[SparkException] {
-                spark.read.schema(providedSchema).parquet(file.getCanonicalPath).collect()
-              }
-              assert(e.getErrorClass == "FAILED_READ_FILE")
-              assert(e.getCause.getMessage.contains(
-                "Unable to create Parquet converter for data type \"timestamp_ntz\""))
+              val df2 = spark.read.schema(providedSchema).parquet(file.getCanonicalPath)
+              checkAnswer(df2, Row(LocalDateTime.parse("2021-02-03T00:00:00")))
             }
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org