You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/08 02:08:11 UTC
[spark] branch branch-3.2 updated: [SPARK-40819][SQL][3.2] Timestamp nanos behaviour regression
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new f82176cd019 [SPARK-40819][SQL][3.2] Timestamp nanos behaviour regression
f82176cd019 is described below
commit f82176cd019de263ae08ba493ab4d1fd43305334
Author: alfreddavidson <al...@gmail.com>
AuthorDate: Wed Feb 8 11:07:59 2023 +0900
[SPARK-40819][SQL][3.2] Timestamp nanos behaviour regression
As per HyukjinKwon request on https://github.com/apache/spark/pull/38312 to backport fix into 3.2
### What changes were proposed in this pull request?
Handle `TimeUnit.NANOS` for parquet `Timestamps` addressing a regression in behaviour since 3.2
### Why are the changes needed?
Since version 3.2 reading parquet files that contain attributes with type `TIMESTAMP(NANOS,true)` is not possible as ParquetSchemaConverter returns
```
Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))
```
https://issues.apache.org/jira/browse/SPARK-34661 introduced a change matching on the `LogicalTypeAnnotation` which only covers Timestamp cases for `TimeUnit.MILLIS` and `TimeUnit.MICROS` meaning `TimeUnit.NANOS` would return `illegalType()`
Prior to 3.2 the matching used the `originalType` which for `TIMESTAMP(NANOS,true)` return `null` and therefore resulted to a `LongType`, the change proposed is too consider `TimeUnit.NANOS` and return `LongType` making behaviour the same as before.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit test covering this scenario.
Internally deployed to read parquet files that contain `TIMESTAMP(NANOS,true)`
Closes #39905 from awdavidson/ts-nanos-fix-3.2.
Authored-by: alfreddavidson <al...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 9 ++++
.../parquet/SpecificParquetRecordReaderBase.java | 1 +
.../datasources/parquet/ParquetFileFormat.scala | 14 +++++-
.../parquet/ParquetSchemaConverter.scala | 15 +++++--
.../datasources/v2/parquet/ParquetScan.scala | 4 ++
.../resources/test-data/timestamp-nanos.parquet | Bin 0 -> 784 bytes
.../datasources/parquet/ParquetSchemaSuite.scala | 50 ++++++++++++++++++---
7 files changed, 82 insertions(+), 11 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b28bfeee245..35a399542fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3163,6 +3163,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong")
+ .internal()
+ .doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.")
+ .version("3.2.3")
+ .booleanConf
+ .createWithDefault(false)
+
val PARQUET_INT96_REBASE_MODE_IN_WRITE =
buildConf("spark.sql.parquet.int96RebaseModeInWrite")
.internal()
@@ -4145,6 +4152,8 @@ class SQLConf extends Serializable with Logging {
def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)
+ def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index c2ffe2129d3..30fc755d31d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -152,6 +152,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
Configuration config = new Configuration();
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+ config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index e5d33b84bf0..ce44808e3a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -119,6 +119,10 @@ class ParquetFileFormat
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
sparkSession.sessionState.conf.parquetOutputTimestampType.toString)
+ conf.set(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sparkSession.sessionState.conf.legacyParquetNanosAsLong.toString)
+
// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
@@ -227,6 +231,9 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ hadoopConf.setBoolean(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sparkSession.sessionState.conf.legacyParquetNanosAsLong)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -421,7 +428,8 @@ object ParquetFileFormat extends Logging {
val converter = new ParquetToSparkSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
- sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
+ nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong)
val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -517,12 +525,14 @@ object ParquetFileFormat extends Logging {
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
+ val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong
val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
- assumeInt96IsTimestamp = assumeInt96IsTimestamp)
+ assumeInt96IsTimestamp = assumeInt96IsTimestamp,
+ nanosAsLong = nanosAsLong)
readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index f3ecd790761..74a220103be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -43,18 +43,22 @@ import org.apache.spark.sql.types._
* [[StringType]] fields.
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
* [[TimestampType]] fields.
+ * @param nanosAsLong Whether timestamps with nanos are converted to long.
*/
class ParquetToSparkSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
- assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) {
+ assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+ nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
- assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp)
+ assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
+ nanosAsLong = conf.legacyParquetNanosAsLong)
def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
- assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean)
+ assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
+ nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)
/**
@@ -171,6 +175,11 @@ class ParquetToSparkSchemaConverter(
TimestampType
case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.MILLIS =>
TimestampType
+ // SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without
+ // timezone awareness to address behaviour regression introduced by SPARK-34661
+ case timestamp: TimestampLogicalTypeAnnotation
+ if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong =>
+ LongType
case _ => illegalType()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index 60573ba10cc..5f67554605c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -76,6 +76,10 @@ case class ParquetScan(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ hadoopConf.setBoolean(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sparkSession.sessionState.conf.legacyParquetNanosAsLong)
+
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
val sqlConf = sparkSession.sessionState.conf
diff --git a/sql/core/src/test/resources/test-data/timestamp-nanos.parquet b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet
new file mode 100644
index 00000000000..962aa909b82
Binary files /dev/null and b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index fcc08ee16e8..feb756b64ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -41,14 +42,16 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
messageType: String,
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
- writeLegacyParquetFormat: Boolean): Unit = {
+ writeLegacyParquetFormat: Boolean,
+ nanosAsLong: Boolean = false): Unit = {
testSchema(
testName,
StructType.fromAttributes(ScalaReflection.attributesFor[T]),
messageType,
binaryAsString,
int96AsTimestamp,
- writeLegacyParquetFormat)
+ writeLegacyParquetFormat,
+ nanosAsLong = nanosAsLong)
}
protected def testParquetToCatalyst(
@@ -56,10 +59,12 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
sqlSchema: StructType,
parquetSchema: String,
binaryAsString: Boolean,
- int96AsTimestamp: Boolean): Unit = {
+ int96AsTimestamp: Boolean,
+ nanosAsLong: Boolean = false): Unit = {
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = binaryAsString,
- assumeInt96IsTimestamp = int96AsTimestamp)
+ assumeInt96IsTimestamp = int96AsTimestamp,
+ nanosAsLong = nanosAsLong)
test(s"sql <= parquet: $testName") {
val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
@@ -100,7 +105,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
- SQLConf.ParquetOutputTimestampType.INT96): Unit = {
+ SQLConf.ParquetOutputTimestampType.INT96,
+ nanosAsLong: Boolean = false): Unit = {
testCatalystToParquet(
testName,
@@ -114,11 +120,25 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
sqlSchema,
parquetSchema,
binaryAsString,
- int96AsTimestamp)
+ int96AsTimestamp,
+ nanosAsLong = nanosAsLong)
}
}
class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
+ testSchemaInference[Tuple1[Long]](
+ "timestamp nanos",
+ """
+ |message root {
+ | required int64 _1 (TIMESTAMP(NANOS,true));
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = true,
+ nanosAsLong = true
+ )
+
testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
"basic types",
"""
@@ -456,6 +476,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}
+ test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") {
+ val tsAttribute = "birthday"
+ withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {
+ val testDataPath = testFile("test-data/timestamp-nanos.parquet")
+ val data = spark.read.parquet(testDataPath).select(tsAttribute)
+ assert(data.schema.fields.head.dataType == LongType)
+ assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L)
+ }
+ }
+
+ test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") {
+ val testDataPath = testFile("test-data/timestamp-nanos.parquet")
+ val e = intercept[SparkException] {
+ spark.read.parquet(testDataPath).collect()
+ }
+ assert(e.getCause.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))"))
+ }
+
// =======================================================
// Tests for converting Parquet LIST to Catalyst ArrayType
// =======================================================
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org