You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/06 09:34:42 UTC

[spark] branch branch-3.4 updated: [SPARK-40819][SQL] Timestamp nanos behaviour regression

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new d2be9fda698 [SPARK-40819][SQL] Timestamp nanos behaviour regression
d2be9fda698 is described below

commit d2be9fda698fdcc6e61763bd3e2aa7b6dc1f3b5c
Author: alfreddavidson <al...@gmail.com>
AuthorDate: Mon Feb 6 18:34:07 2023 +0900

    [SPARK-40819][SQL] Timestamp nanos behaviour regression
    
    ### What changes were proposed in this pull request?
    
    Handle `TimeUnit.NANOS` for parquet `Timestamps` addressing a regression in behaviour since 3.2
    
    ### Why are the changes needed?
    
    Since version 3.2 reading parquet files that contain attributes with type `TIMESTAMP(NANOS,true)` is not possible as ParquetSchemaConverter returns
    ```
    Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))
    ```
    https://issues.apache.org/jira/browse/SPARK-34661 introduced a change matching on the `LogicalTypeAnnotation` which only covers Timestamp cases for `TimeUnit.MILLIS` and `TimeUnit.MICROS` meaning `TimeUnit.NANOS` would return `illegalType()`
    
    Prior to 3.2 the matching used the `originalType` which for `TIMESTAMP(NANOS,true)` return `null` and therefore resulted to a `LongType`, the change proposed is too consider `TimeUnit.NANOS` and return `LongType` making behaviour the same as before.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit test covering this scenario.
    Internally deployed to read parquet files that contain `TIMESTAMP(NANOS,true)`
    
    Closes #38312 from awdavidson/ts-nanos-fix.
    
    Lead-authored-by: alfreddavidson <al...@gmail.com>
    Co-authored-by: Attila Zsolt Piros <20...@users.noreply.github.com>
    Co-authored-by: awdavidson <54...@users.noreply.github.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit ceccda07076240a13759354dda35d929445a90e8)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |   9 +++
 .../parquet/SpecificParquetRecordReaderBase.java   |   2 +
 .../datasources/parquet/ParquetFileFormat.scala    |  11 +++-
 .../parquet/ParquetSchemaConverter.scala           |  15 ++++-
 .../datasources/parquet/ParquetUtils.scala         |   4 ++
 .../datasources/v2/parquet/ParquetScan.scala       |   3 +
 .../resources/test-data/timestamp-nanos.parquet    | Bin 0 -> 784 bytes
 .../datasources/parquet/ParquetSchemaSuite.scala   |  72 +++++++++++++++++++--
 8 files changed, 104 insertions(+), 12 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2f05c356160..ecc35850bf0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3774,6 +3774,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong")
+    .internal()
+    .doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.")
+    .version("3.2.3")
+    .booleanConf
+    .createWithDefault(false)
+
   val PARQUET_INT96_REBASE_MODE_IN_WRITE =
     buildConf("spark.sql.parquet.int96RebaseModeInWrite")
       .internal()
@@ -4943,6 +4950,8 @@ class SQLConf extends Serializable with Logging {
 
   def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)
 
+  def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG)
+
   def parquetInferTimestampNTZEnabled: Boolean = getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED)
 
   def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index b14f329b413..330296b64c5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -150,6 +150,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
     config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
     config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
+    config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
 
     this.file = new Path(path);
     long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
@@ -201,6 +202,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
     config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
     config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
+    config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
     this.parquetColumn = new ParquetToSparkSchemaConverter(config)
       .convertParquetColumn(requestedSchema, Option.empty());
     this.sparkSchema = (StructType) parquetColumn.sparkType();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index afa00aa6f37..5789f252c3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -153,6 +153,10 @@ class ParquetFileFormat
     hadoopConf.setBoolean(
       SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
       sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+      sparkSession.sessionState.conf.legacyParquetNanosAsLong)
+
 
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -357,7 +361,8 @@ object ParquetFileFormat extends Logging {
     val converter = new ParquetToSparkSchemaConverter(
       sparkSession.sessionState.conf.isParquetBinaryAsString,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
-      inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
+      inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled,
+      nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong)
 
     val seen = mutable.HashSet[String]()
     val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -454,13 +459,15 @@ object ParquetFileFormat extends Logging {
     val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
     val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
     val inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled
+    val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong
 
     val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
       // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
       val converter = new ParquetToSparkSchemaConverter(
         assumeBinaryIsString = assumeBinaryIsString,
         assumeInt96IsTimestamp = assumeInt96IsTimestamp,
-        inferTimestampNTZ = inferTimestampNTZ)
+        inferTimestampNTZ = inferTimestampNTZ,
+        nanosAsLong = nanosAsLong)
 
       readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
         .map(ParquetFileFormat.readSchemaFromFooter(_, converter))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index f6b02579d31..9c9e7ce729c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -49,24 +49,28 @@ import org.apache.spark.sql.types._
  * @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read
  *                      schema with Parquet schema.
  * @param inferTimestampNTZ Whether TimestampNTZType type is enabled.
+ * @param nanosAsLong Whether timestamps with nanos are converted to long.
  */
 class ParquetToSparkSchemaConverter(
     assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
     assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
     caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
-    inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
+    inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get,
+    nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     assumeBinaryIsString = conf.isParquetBinaryAsString,
     assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
     caseSensitive = conf.caseSensitiveAnalysis,
-    inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled)
+    inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled,
+    nanosAsLong = conf.legacyParquetNanosAsLong)
 
   def this(conf: Configuration) = this(
     assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
     assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
     caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
-    inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
+    inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean,
+    nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)
 
   /**
    * Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter.
@@ -271,6 +275,11 @@ class ParquetToSparkSchemaConverter(
             } else {
               TimestampNTZType
             }
+          // SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without
+          // timezone awareness to address behaviour regression introduced by SPARK-34661
+          case timestamp: TimestampLogicalTypeAnnotation
+            if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong =>
+            LongType
           case _ => illegalType()
         }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index de4eda1acfd..a6d13d072f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -461,6 +461,10 @@ object ParquetUtils extends Logging {
       SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
       sqlConf.parquetFieldIdWriteEnabled.toString)
 
+    conf.set(
+      SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+      sqlConf.legacyParquetNanosAsLong.toString)
+
     // Sets compression scheme
     conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index 7495893a911..feca878498d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -91,6 +91,9 @@ case class ParquetScan(
     hadoopConf.setBoolean(
       SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
       sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+      sparkSession.sessionState.conf.legacyParquetNanosAsLong)
 
     val broadcastedConf = sparkSession.sparkContext.broadcast(
       new SerializableConfiguration(hadoopConf))
diff --git a/sql/core/src/test/resources/test-data/timestamp-nanos.parquet b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet
new file mode 100644
index 00000000000..962aa909b82
Binary files /dev/null and b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 468e31d1879..5589c61be7a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -29,6 +29,7 @@ import org.apache.parquet.schema.Type._
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+import org.apache.spark.sql.functions.desc
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._
 import org.apache.spark.sql.test.SharedSparkSession
@@ -45,7 +46,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
       binaryAsString: Boolean,
       int96AsTimestamp: Boolean,
       writeLegacyParquetFormat: Boolean,
-      expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
+      expectedParquetColumn: Option[ParquetColumn] = None,
+      nanosAsLong: Boolean = false): Unit = {
     testSchema(
       testName,
       StructType.fromAttributes(ScalaReflection.attributesFor[T]),
@@ -53,7 +55,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
       binaryAsString,
       int96AsTimestamp,
       writeLegacyParquetFormat,
-      expectedParquetColumn = expectedParquetColumn)
+      expectedParquetColumn = expectedParquetColumn,
+      nanosAsLong = nanosAsLong)
   }
 
   protected def testParquetToCatalyst(
@@ -65,12 +68,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
       caseSensitive: Boolean = false,
       inferTimestampNTZ: Boolean = true,
       sparkReadSchema: Option[StructType] = None,
-      expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
+      expectedParquetColumn: Option[ParquetColumn] = None,
+      nanosAsLong: Boolean = false): Unit = {
     val converter = new ParquetToSparkSchemaConverter(
       assumeBinaryIsString = binaryAsString,
       assumeInt96IsTimestamp = int96AsTimestamp,
       caseSensitive = caseSensitive,
-      inferTimestampNTZ = inferTimestampNTZ)
+      inferTimestampNTZ = inferTimestampNTZ,
+      nanosAsLong = nanosAsLong)
 
     test(s"sql <= parquet: $testName") {
       val actualParquetColumn = converter.convertParquetColumn(
@@ -119,7 +124,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
       writeLegacyParquetFormat: Boolean,
       outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
         SQLConf.ParquetOutputTimestampType.INT96,
-      expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
+      expectedParquetColumn: Option[ParquetColumn] = None,
+      nanosAsLong: Boolean = false): Unit = {
 
     testCatalystToParquet(
       testName,
@@ -134,7 +140,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
       parquetSchema,
       binaryAsString,
       int96AsTimestamp,
-      expectedParquetColumn = expectedParquetColumn)
+      expectedParquetColumn = expectedParquetColumn,
+      nanosAsLong = nanosAsLong)
   }
 
   protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = {
@@ -149,7 +156,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
       val expectedDesc = expected.descriptor.get
       assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
       assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
-      assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+
+      actualDesc.getPrimitiveType.getLogicalTypeAnnotation match {
+        case timestamp: LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
+          if timestamp.getUnit == LogicalTypeAnnotation.TimeUnit.NANOS =>
+          assert(actual.sparkType == expected.sparkType)
+        case _ =>
+          assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+      }
     }
 
     assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " +
@@ -197,6 +211,32 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
 }
 
 class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
+  testSchemaInference[Tuple1[Long]](
+    "timestamp nanos",
+    """
+      |message root {
+      |  required int64 _1 (TIMESTAMP(NANOS,true));
+      |}
+    """.stripMargin,
+    binaryAsString = false,
+    int96AsTimestamp = true,
+    writeLegacyParquetFormat = true,
+    expectedParquetColumn = Some(
+      ParquetColumn(
+        sparkType = StructType.fromAttributes(
+          ScalaReflection.attributesFor[Tuple1[Long]]),
+        descriptor = None,
+        repetitionLevel = 0,
+        definitionLevel = 0,
+        required = false,
+        path = Seq(),
+        children = Seq(
+          primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED,
+            0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false)))
+        ))),
+    nanosAsLong = true
+  )
+
   testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
     "basic types",
     """
@@ -1027,6 +1067,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }
   }
 
+  test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") {
+    val tsAttribute = "birthday"
+    withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {
+      val testDataPath = testFile("test-data/timestamp-nanos.parquet")
+      val data = spark.read.parquet(testDataPath).select(tsAttribute)
+      assert(data.schema.fields.head.dataType == LongType)
+      assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L)
+    }
+  }
+
+  test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") {
+    val testDataPath = testFile("test-data/timestamp-nanos.parquet")
+    val e = intercept[org.apache.spark.SparkException] {
+      spark.read.parquet(testDataPath).collect()
+    }
+    assert(e.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))."))
+  }
+
   // =======================================================
   // Tests for converting Parquet LIST to Catalyst ArrayType
   // =======================================================


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org