You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/06 09:34:42 UTC
[spark] branch branch-3.4 updated: [SPARK-40819][SQL] Timestamp nanos behaviour regression
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new d2be9fda698 [SPARK-40819][SQL] Timestamp nanos behaviour regression
d2be9fda698 is described below
commit d2be9fda698fdcc6e61763bd3e2aa7b6dc1f3b5c
Author: alfreddavidson <al...@gmail.com>
AuthorDate: Mon Feb 6 18:34:07 2023 +0900
[SPARK-40819][SQL] Timestamp nanos behaviour regression
### What changes were proposed in this pull request?
Handle `TimeUnit.NANOS` for parquet `Timestamps` addressing a regression in behaviour since 3.2
### Why are the changes needed?
Since version 3.2 reading parquet files that contain attributes with type `TIMESTAMP(NANOS,true)` is not possible as ParquetSchemaConverter returns
```
Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))
```
https://issues.apache.org/jira/browse/SPARK-34661 introduced a change matching on the `LogicalTypeAnnotation` which only covers Timestamp cases for `TimeUnit.MILLIS` and `TimeUnit.MICROS` meaning `TimeUnit.NANOS` would return `illegalType()`
Prior to 3.2 the matching used the `originalType` which for `TIMESTAMP(NANOS,true)` return `null` and therefore resulted to a `LongType`, the change proposed is too consider `TimeUnit.NANOS` and return `LongType` making behaviour the same as before.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit test covering this scenario.
Internally deployed to read parquet files that contain `TIMESTAMP(NANOS,true)`
Closes #38312 from awdavidson/ts-nanos-fix.
Lead-authored-by: alfreddavidson <al...@gmail.com>
Co-authored-by: Attila Zsolt Piros <20...@users.noreply.github.com>
Co-authored-by: awdavidson <54...@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit ceccda07076240a13759354dda35d929445a90e8)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 9 +++
.../parquet/SpecificParquetRecordReaderBase.java | 2 +
.../datasources/parquet/ParquetFileFormat.scala | 11 +++-
.../parquet/ParquetSchemaConverter.scala | 15 ++++-
.../datasources/parquet/ParquetUtils.scala | 4 ++
.../datasources/v2/parquet/ParquetScan.scala | 3 +
.../resources/test-data/timestamp-nanos.parquet | Bin 0 -> 784 bytes
.../datasources/parquet/ParquetSchemaSuite.scala | 72 +++++++++++++++++++--
8 files changed, 104 insertions(+), 12 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2f05c356160..ecc35850bf0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3774,6 +3774,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong")
+ .internal()
+ .doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.")
+ .version("3.2.3")
+ .booleanConf
+ .createWithDefault(false)
+
val PARQUET_INT96_REBASE_MODE_IN_WRITE =
buildConf("spark.sql.parquet.int96RebaseModeInWrite")
.internal()
@@ -4943,6 +4950,8 @@ class SQLConf extends Serializable with Logging {
def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)
+ def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG)
+
def parquetInferTimestampNTZEnabled: Boolean = getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED)
def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index b14f329b413..330296b64c5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -150,6 +150,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
+ config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
@@ -201,6 +202,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
+ config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
this.parquetColumn = new ParquetToSparkSchemaConverter(config)
.convertParquetColumn(requestedSchema, Option.empty());
this.sparkSchema = (StructType) parquetColumn.sparkType();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index afa00aa6f37..5789f252c3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -153,6 +153,10 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
+ hadoopConf.setBoolean(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sparkSession.sessionState.conf.legacyParquetNanosAsLong)
+
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -357,7 +361,8 @@ object ParquetFileFormat extends Logging {
val converter = new ParquetToSparkSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
- inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
+ inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled,
+ nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong)
val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -454,13 +459,15 @@ object ParquetFileFormat extends Logging {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
val inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled
+ val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong
val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
- inferTimestampNTZ = inferTimestampNTZ)
+ inferTimestampNTZ = inferTimestampNTZ,
+ nanosAsLong = nanosAsLong)
readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index f6b02579d31..9c9e7ce729c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -49,24 +49,28 @@ import org.apache.spark.sql.types._
* @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read
* schema with Parquet schema.
* @param inferTimestampNTZ Whether TimestampNTZType type is enabled.
+ * @param nanosAsLong Whether timestamps with nanos are converted to long.
*/
class ParquetToSparkSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
- inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
+ inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get,
+ nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
caseSensitive = conf.caseSensitiveAnalysis,
- inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled)
+ inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled,
+ nanosAsLong = conf.legacyParquetNanosAsLong)
def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
- inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
+ inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean,
+ nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)
/**
* Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter.
@@ -271,6 +275,11 @@ class ParquetToSparkSchemaConverter(
} else {
TimestampNTZType
}
+ // SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without
+ // timezone awareness to address behaviour regression introduced by SPARK-34661
+ case timestamp: TimestampLogicalTypeAnnotation
+ if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong =>
+ LongType
case _ => illegalType()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index de4eda1acfd..a6d13d072f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -461,6 +461,10 @@ object ParquetUtils extends Logging {
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sqlConf.parquetFieldIdWriteEnabled.toString)
+ conf.set(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sqlConf.legacyParquetNanosAsLong.toString)
+
// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index 7495893a911..feca878498d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -91,6 +91,9 @@ case class ParquetScan(
hadoopConf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
+ hadoopConf.setBoolean(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sparkSession.sessionState.conf.legacyParquetNanosAsLong)
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
diff --git a/sql/core/src/test/resources/test-data/timestamp-nanos.parquet b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet
new file mode 100644
index 00000000000..962aa909b82
Binary files /dev/null and b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 468e31d1879..5589c61be7a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -29,6 +29,7 @@ import org.apache.parquet.schema.Type._
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._
import org.apache.spark.sql.test.SharedSparkSession
@@ -45,7 +46,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
- expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
+ expectedParquetColumn: Option[ParquetColumn] = None,
+ nanosAsLong: Boolean = false): Unit = {
testSchema(
testName,
StructType.fromAttributes(ScalaReflection.attributesFor[T]),
@@ -53,7 +55,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
binaryAsString,
int96AsTimestamp,
writeLegacyParquetFormat,
- expectedParquetColumn = expectedParquetColumn)
+ expectedParquetColumn = expectedParquetColumn,
+ nanosAsLong = nanosAsLong)
}
protected def testParquetToCatalyst(
@@ -65,12 +68,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
caseSensitive: Boolean = false,
inferTimestampNTZ: Boolean = true,
sparkReadSchema: Option[StructType] = None,
- expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
+ expectedParquetColumn: Option[ParquetColumn] = None,
+ nanosAsLong: Boolean = false): Unit = {
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
caseSensitive = caseSensitive,
- inferTimestampNTZ = inferTimestampNTZ)
+ inferTimestampNTZ = inferTimestampNTZ,
+ nanosAsLong = nanosAsLong)
test(s"sql <= parquet: $testName") {
val actualParquetColumn = converter.convertParquetColumn(
@@ -119,7 +124,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
writeLegacyParquetFormat: Boolean,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96,
- expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
+ expectedParquetColumn: Option[ParquetColumn] = None,
+ nanosAsLong: Boolean = false): Unit = {
testCatalystToParquet(
testName,
@@ -134,7 +140,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
parquetSchema,
binaryAsString,
int96AsTimestamp,
- expectedParquetColumn = expectedParquetColumn)
+ expectedParquetColumn = expectedParquetColumn,
+ nanosAsLong = nanosAsLong)
}
protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = {
@@ -149,7 +156,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
val expectedDesc = expected.descriptor.get
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
- assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+
+ actualDesc.getPrimitiveType.getLogicalTypeAnnotation match {
+ case timestamp: LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
+ if timestamp.getUnit == LogicalTypeAnnotation.TimeUnit.NANOS =>
+ assert(actual.sparkType == expected.sparkType)
+ case _ =>
+ assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+ }
}
assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " +
@@ -197,6 +211,32 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
}
class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
+ testSchemaInference[Tuple1[Long]](
+ "timestamp nanos",
+ """
+ |message root {
+ | required int64 _1 (TIMESTAMP(NANOS,true));
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true,
+ expectedParquetColumn = Some(
+ ParquetColumn(
+ sparkType = StructType.fromAttributes(
+ ScalaReflection.attributesFor[Tuple1[Long]]),
+ descriptor = None,
+ repetitionLevel = 0,
+ definitionLevel = 0,
+ required = false,
+ path = Seq(),
+ children = Seq(
+ primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED,
+ 0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false)))
+ ))),
+ nanosAsLong = true
+ )
+
testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
"basic types",
"""
@@ -1027,6 +1067,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}
+ test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") {
+ val tsAttribute = "birthday"
+ withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {
+ val testDataPath = testFile("test-data/timestamp-nanos.parquet")
+ val data = spark.read.parquet(testDataPath).select(tsAttribute)
+ assert(data.schema.fields.head.dataType == LongType)
+ assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L)
+ }
+ }
+
+ test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") {
+ val testDataPath = testFile("test-data/timestamp-nanos.parquet")
+ val e = intercept[org.apache.spark.SparkException] {
+ spark.read.parquet(testDataPath).collect()
+ }
+ assert(e.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))."))
+ }
+
// =======================================================
// Tests for converting Parquet LIST to Catalyst ArrayType
// =======================================================
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org