You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/08 02:07:39 UTC
[spark] branch branch-3.3 updated: [SPARK-40819][SQL][3.3] Timestamp nanos behaviour regression
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 3ec9b05a23c [SPARK-40819][SQL][3.3] Timestamp nanos behaviour regression
3ec9b05a23c is described below
commit 3ec9b05a23cc780438772d847b2fab19aab2d60a
Author: alfreddavidson <al...@gmail.com>
AuthorDate: Wed Feb 8 11:07:25 2023 +0900
[SPARK-40819][SQL][3.3] Timestamp nanos behaviour regression
As per HyukjinKwon request on https://github.com/apache/spark/pull/38312 to backport fix into 3.3
### What changes were proposed in this pull request?
Handle `TimeUnit.NANOS` for parquet `Timestamps` addressing a regression in behaviour since 3.2
### Why are the changes needed?
Since version 3.2 reading parquet files that contain attributes with type `TIMESTAMP(NANOS,true)` is not possible as ParquetSchemaConverter returns
```
Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))
```
https://issues.apache.org/jira/browse/SPARK-34661 introduced a change matching on the `LogicalTypeAnnotation` which only covers Timestamp cases for `TimeUnit.MILLIS` and `TimeUnit.MICROS` meaning `TimeUnit.NANOS` would return `illegalType()`
Prior to 3.2 the matching used the `originalType` which for `TIMESTAMP(NANOS,true)` return `null` and therefore resulted to a `LongType`, the change proposed is too consider `TimeUnit.NANOS` and return `LongType` making behaviour the same as before.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit test covering this scenario.
Internally deployed to read parquet files that contain `TIMESTAMP(NANOS,true)`
Closes #39904 from awdavidson/ts-nanos-fix-3.3.
Lead-authored-by: alfreddavidson <al...@gmail.com>
Co-authored-by: awdavidson <54...@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 9 +++
.../parquet/SpecificParquetRecordReaderBase.java | 2 +
.../datasources/parquet/ParquetFileFormat.scala | 14 +++-
.../parquet/ParquetSchemaConverter.scala | 15 ++++-
.../datasources/v2/parquet/ParquetScan.scala | 4 ++
.../resources/test-data/timestamp-nanos.parquet | Bin 0 -> 784 bytes
.../datasources/parquet/ParquetSchemaSuite.scala | 72 +++++++++++++++++++--
7 files changed, 104 insertions(+), 12 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d9e38ea9258..cab2aad08cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3459,6 +3459,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong")
+ .internal()
+ .doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.")
+ .version("3.2.3")
+ .booleanConf
+ .createWithDefault(false)
+
val PARQUET_INT96_REBASE_MODE_IN_WRITE =
buildConf("spark.sql.parquet.int96RebaseModeInWrite")
.internal()
@@ -4525,6 +4532,8 @@ class SQLConf extends Serializable with Logging {
def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)
+ def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG)
+
def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)
def histogramNumericPropagateInputType: Boolean =
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 48016c3fdc0..49dbdf87ae0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -149,6 +149,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
+ config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
@@ -199,6 +200,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
+ config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
this.parquetColumn = new ParquetToSparkSchemaConverter(config)
.convertParquetColumn(requestedSchema, Option.empty());
this.sparkSchema = (StructType) parquetColumn.sparkType();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f66434d3caf..e63881422aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -123,6 +123,10 @@ class ParquetFileFormat
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sparkSession.sessionState.conf.parquetFieldIdWriteEnabled.toString)
+ conf.set(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sparkSession.sessionState.conf.legacyParquetNanosAsLong.toString)
+
// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
@@ -239,6 +243,9 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ hadoopConf.setBoolean(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sparkSession.sessionState.conf.legacyParquetNanosAsLong)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -440,7 +447,8 @@ object ParquetFileFormat extends Logging {
val converter = new ParquetToSparkSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
- sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
+ nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong)
val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -536,12 +544,14 @@ object ParquetFileFormat extends Logging {
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
+ val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong
val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
- assumeInt96IsTimestamp = assumeInt96IsTimestamp)
+ assumeInt96IsTimestamp = assumeInt96IsTimestamp,
+ nanosAsLong = nanosAsLong)
readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 3419bf15f8e..d9ccb524777 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -49,21 +49,25 @@ import org.apache.spark.util.Utils
* [[TimestampType]] fields.
* @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read
* schema with Parquet schema
+ * @param nanosAsLong Whether timestamps with nanos are converted to long.
*/
class ParquetToSparkSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
- caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get) {
+ caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
+ nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
- caseSensitive = conf.caseSensitiveAnalysis)
+ caseSensitive = conf.caseSensitiveAnalysis,
+ nanosAsLong = conf.legacyParquetNanosAsLong)
def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
- caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean)
+ caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
+ conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)
/**
@@ -257,6 +261,11 @@ class ParquetToSparkSchemaConverter(
// SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3
if (Utils.isTesting) TimestampNTZType else TimestampType
}
+ // SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without
+ // timezone awareness to address behaviour regression introduced by SPARK-34661
+ case timestamp: TimestampLogicalTypeAnnotation
+ if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong =>
+ LongType
case _ => illegalType()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index 99632d79cd8..0fec27505d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -86,6 +86,10 @@ case class ParquetScan(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ hadoopConf.setBoolean(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sparkSession.sessionState.conf.legacyParquetNanosAsLong)
+
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
val sqlConf = sparkSession.sessionState.conf
diff --git a/sql/core/src/test/resources/test-data/timestamp-nanos.parquet b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet
new file mode 100644
index 00000000000..962aa909b82
Binary files /dev/null and b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index d0228d7bdf9..a62dd53342a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._
import org.apache.spark.sql.test.SharedSparkSession
@@ -46,7 +47,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
- expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
+ expectedParquetColumn: Option[ParquetColumn] = None,
+ nanosAsLong: Boolean = false): Unit = {
testSchema(
testName,
StructType.fromAttributes(ScalaReflection.attributesFor[T]),
@@ -54,7 +56,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
binaryAsString,
int96AsTimestamp,
writeLegacyParquetFormat,
- expectedParquetColumn = expectedParquetColumn)
+ expectedParquetColumn = expectedParquetColumn,
+ nanosAsLong = nanosAsLong)
}
protected def testParquetToCatalyst(
@@ -65,11 +68,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
int96AsTimestamp: Boolean,
caseSensitive: Boolean = false,
sparkReadSchema: Option[StructType] = None,
- expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
+ expectedParquetColumn: Option[ParquetColumn] = None,
+ nanosAsLong: Boolean = false): Unit = {
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
- caseSensitive = caseSensitive)
+ caseSensitive = caseSensitive,
+ nanosAsLong = nanosAsLong)
test(s"sql <= parquet: $testName") {
val actualParquetColumn = converter.convertParquetColumn(
@@ -117,7 +122,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
writeLegacyParquetFormat: Boolean,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96,
- expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
+ expectedParquetColumn: Option[ParquetColumn] = None,
+ nanosAsLong: Boolean = false): Unit = {
testCatalystToParquet(
testName,
@@ -132,7 +138,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
parquetSchema,
binaryAsString,
int96AsTimestamp,
- expectedParquetColumn = expectedParquetColumn)
+ expectedParquetColumn = expectedParquetColumn,
+ nanosAsLong = nanosAsLong)
}
protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = {
@@ -147,7 +154,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
val expectedDesc = expected.descriptor.get
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
- assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+
+ actualDesc.getPrimitiveType.getLogicalTypeAnnotation match {
+ case timestamp: LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
+ if timestamp.getUnit == LogicalTypeAnnotation.TimeUnit.NANOS =>
+ assert(actual.sparkType == expected.sparkType)
+ case _ =>
+ assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+ }
}
assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " +
@@ -195,6 +209,32 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
}
class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
+ testSchemaInference[Tuple1[Long]](
+ "timestamp nanos",
+ """
+ |message root {
+ | required int64 _1 (TIMESTAMP(NANOS,true));
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true,
+ expectedParquetColumn = Some(
+ ParquetColumn(
+ sparkType = StructType.fromAttributes(
+ ScalaReflection.attributesFor[Tuple1[Long]]),
+ descriptor = None,
+ repetitionLevel = 0,
+ definitionLevel = 0,
+ required = false,
+ path = Seq(),
+ children = Seq(
+ primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED,
+ 0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false)))
+ ))),
+ nanosAsLong = true
+ )
+
testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
"basic types",
"""
@@ -1005,6 +1045,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}
+ test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") {
+ val tsAttribute = "birthday"
+ withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {
+ val testDataPath = testFile("test-data/timestamp-nanos.parquet")
+ val data = spark.read.parquet(testDataPath).select(tsAttribute)
+ assert(data.schema.fields.head.dataType == LongType)
+ assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L)
+ }
+ }
+
+ test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") {
+ val testDataPath = testFile("test-data/timestamp-nanos.parquet")
+ val e = intercept[SparkException] {
+ spark.read.parquet(testDataPath).collect()
+ }
+ assert(e.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))"))
+ }
+
// =======================================================
// Tests for converting Parquet LIST to Catalyst ArrayType
// =======================================================
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org